You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/10/05 02:52:03 UTC
svn commit: r1004490 - in /hadoop/pig/branches/branch-0.8: ./
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/newplan/logical/
src/org/apache/pig/newplan/logical/optimizer/
src/org/apache/pig/newplan/logical/re...
Author: daijy
Date: Tue Oct 5 00:52:02 2010
New Revision: 1004490
URL: http://svn.apache.org/viewvc?rev=1004490&view=rev
Log:
PIG-1659: sortinfo is not set for store if there is a filter after ORDER BY
Added:
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
Modified:
hadoop/pig/branches/branch-0.8/CHANGES.txt
hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java
Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Tue Oct 5 00:52:02 2010
@@ -200,6 +200,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1659: sortinfo is not set for store if there is a filter after ORDER BY (daijy)
+
PIG-1664: leading '_' in directory/file names should be ignored; the "pigtest" build target should include all pig-related zebra tests. (yanz)
PIG-1662: Need better error message for MalFormedProbVecException (rding)
Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java Tue Oct 5 00:52:02 2010
@@ -1332,15 +1332,15 @@ public class PigServer {
LogicalOptimizer optimizer = new LogicalOptimizer(lp, pigContext.getExecType(), optimizerRules);
optimizer.optimize();
+
+ // compute whether output data is sorted or not
+ SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
+ sortInfoSetter.visit();
+
+ // run validations to be done after optimization
+ isBeforeOptimizer = false;
+ validate(lp, collector, isBeforeOptimizer);
}
-
- // compute whether output data is sorted or not
- SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
- sortInfoSetter.visit();
-
- // run validations to be done after optimization
- isBeforeOptimizer = false;
- validate(lp, collector, isBeforeOptimizer);
return lp;
}
Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Oct 5 00:52:02 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
+import org.apache.pig.SortInfo;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
@@ -61,8 +62,21 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
+import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -261,6 +275,18 @@ public class HExecutionEngine {
new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(newPlan, 100, optimizerRules);
optimizer.optimize();
+ // compute whether output data is sorted or not
+ SortInfoSetter sortInfoSetter = new SortInfoSetter(newPlan);
+ sortInfoSetter.visit();
+
+ if (pigContext.inExplain==false) {
+ // Validate input/output file. Currently no validation framework in
+ // new logical plan, put this validator here first.
+ // We might decide to move it out to a validator framework in future
+ InputOutputFileValidator validator = new InputOutputFileValidator(newPlan, pigContext);
+ validator.validate();
+ }
+
// translate new logical plan to physical plan
org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor translator =
new org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor(newPlan);
@@ -282,6 +308,56 @@ public class HExecutionEngine {
throw new ExecException(msg, errCode, PigException.BUG, ve);
}
}
+
+ public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
+
+ public SortInfoSetter(OperatorPlan plan) throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ }
+
+ @Override
+ public void visit(LOStore store) throws FrontendException {
+
+ Operator storePred = store.getPlan().getPredecessors(store).get(0);
+ if(storePred == null){
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Store." ;
+ throw new FrontendException(msg, errCode, PigException.BUG);
+ }
+
+ SortInfo sortInfo = null;
+ if(storePred instanceof LOLimit) {
+ storePred = store.getPlan().getPredecessors(storePred).get(0);
+ } else if (storePred instanceof LOSplitOutput) {
+ LOSplitOutput splitOutput = (LOSplitOutput)storePred;
+ // We assume this is the LOSplitOutput we injected for this case:
+ // b = order a by $0; store b into '1'; store b into '2';
+ // In this case, we should mark both '1' and '2' as sorted
+ LogicalExpressionPlan conditionPlan = splitOutput.getFilterPlan();
+ if (conditionPlan.getSinks().size()==1) {
+ Operator root = conditionPlan.getSinks().get(0);
+ if (root instanceof ConstantExpression) {
+ Object value = ((ConstantExpression)root).getValue();
+ if (value instanceof Boolean && (Boolean)value==true) {
+ Operator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+ if (split instanceof LOSplit)
+ storePred = store.getPlan().getPredecessors(split).get(0);
+ }
+ }
+ }
+ }
+ // if this predecessor is a sort, get
+ // the sort info.
+ if(storePred instanceof LOSort) {
+ try {
+ sortInfo = ((LOSort)storePred).getSortInfo();
+ } catch (FrontendException e) {
+ throw new FrontendException(e);
+ }
+ }
+ store.setSortInfo(sortInfo);
+ }
+ }
public List<ExecJob> execute(PhysicalPlan plan,
String jobName) throws ExecException, FrontendException {
Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Tue Oct 5 00:52:02 2010
@@ -362,7 +362,6 @@ public class LogicalPlanMigrationVistor
newStore.setRequestedParallelism(store.getRequestedParallelism());
newStore.setSignature(store.getSignature());
newStore.setInputSpec(store.getInputSpec());
- newStore.setSortInfo(store.getSortInfo());
newStore.setTmpStore(store.isTmpStore());
logicalPlan.add(newStore);
Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Tue Oct 5 00:52:02 2010
@@ -28,6 +28,7 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
+import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
import org.apache.pig.newplan.logical.rules.LimitOptimizer;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier;
Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java Tue Oct 5 00:52:02 2010
@@ -125,5 +125,6 @@ public class LOStore extends LogicalRela
public void setSignature(String sig) {
signature = sig;
+ storeFunc.setStoreFuncUDFContextSignature(signature);
}
}
Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1004490&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (added)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Tue Oct 5 00:52:02 2010
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DepthFirstWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter.GroupAllParallelSetterTransformer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class InputOutputFileValidator {
+ private PigContext pigCtx;
+ OperatorPlan plan;
+ public InputOutputFileValidator(OperatorPlan plan, PigContext pigContext) {
+ pigCtx = pigContext;
+ this.plan = plan;
+ }
+
+ public void validate() throws FrontendException {
+ InputOutputFileVisitor visitor = new InputOutputFileVisitor(plan);
+ visitor.visit();
+ }
+
+ class InputOutputFileVisitor extends LogicalRelationalNodesVisitor {
+
+ protected InputOutputFileVisitor(OperatorPlan plan)
+ throws FrontendException {
+ super(plan, new DepthFirstWalker(plan));
+ }
+
+ @Override
+ public void visit(LOStore store) throws FrontendException {
+ StoreFuncInterface sf = store.getStoreFunc();
+ String outLoc = store.getOutputSpec().getFileName();
+ int errCode = 2116;
+ String validationErrStr ="Output Location Validation Failed for: '" + outLoc ;
+ Job dummyJob;
+
+ try {
+ if(store.getSchema() != null){
+ sf.checkSchema(new ResourceSchema(Util.translateSchema(store.getSchema()), store.getSortInfo()));
+ }
+ dummyJob = new Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));
+ sf.setStoreLocation(outLoc, dummyJob);
+ } catch (IOException ioe) {
+ if(ioe instanceof PigException){
+ errCode = ((PigException)ioe).getErrorCode();
+ }
+ String exceptionMsg = ioe.getMessage();
+ validationErrStr += (exceptionMsg == null) ? "" : " More info to follow:\n" +exceptionMsg;
+ throw new FrontendException(validationErrStr, errCode, pigCtx.getErrorSource(), ioe);
+ }
+
+ validationErrStr += " More info to follow:\n";
+ try {
+ sf.getOutputFormat().checkOutputSpecs(dummyJob);
+ } catch (IOException ioe) {
+ byte errSrc = pigCtx.getErrorSource();
+ switch(errSrc) {
+ case PigException.BUG:
+ errCode = 2002;
+ break;
+ case PigException.REMOTE_ENVIRONMENT:
+ errCode = 6000;
+ break;
+ case PigException.USER_ENVIRONMENT:
+ errCode = 4000;
+ break;
+ }
+ validationErrStr += ioe.getMessage();
+ throw new FrontendException(validationErrStr, errCode, errSrc, ioe);
+ } catch (InterruptedException ie) {
+ validationErrStr += ie.getMessage();
+ throw new FrontendException(validationErrStr, errCode, pigCtx.getErrorSource(), ie);
+ }
+ }
+ }
+}
Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java Tue Oct 5 00:52:02 2010
@@ -703,7 +703,6 @@ public class TestLogToPhyCompiler extend
@Test
public void testSortInfoMultipleStore() throws Exception {
PigServer myPig = new PigServer(ExecType.LOCAL);
- myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false");
myPig.setBatchOn();
myPig.registerQuery("a = load 'bla' as (i:int, n:chararray, d:double);");
myPig.registerQuery("b = order a by i, d desc;");
@@ -719,7 +718,13 @@ public class TestLogToPhyCompiler extend
LOPrinter lpr = new LOPrinter(System.err, lp);
lpr.visit();
- PhysicalPlan pp = buildPhysicalPlan(lp);
+ java.lang.reflect.Method compilePp = myPig.getClass()
+ .getDeclaredMethod("compilePp",
+ new Class[] { LogicalPlan.class });
+
+ compilePp.setAccessible(true);
+
+ PhysicalPlan pp = (PhysicalPlan) compilePp.invoke(myPig, new Object[] { lp });
SortInfo si0 = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
SortInfo si1 = ((POStore)(pp.getLeaves().get(1))).getSortInfo();
SortInfo expected = getSortInfo(