You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/10/05 02:52:03 UTC

svn commit: r1004490 - in /hadoop/pig/branches/branch-0.8: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/re...

Author: daijy
Date: Tue Oct  5 00:52:02 2010
New Revision: 1004490

URL: http://svn.apache.org/viewvc?rev=1004490&view=rev
Log:
PIG-1659: sortinfo is not set for store if there is a filter after ORDER BY

Added:
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Tue Oct  5 00:52:02 2010
@@ -200,6 +200,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1659: sortinfo is not set for store if there is a filter after ORDER BY (daijy)
+
 PIG-1664: leading '_' in directory/file names should be ignored; the "pigtest" build target should include all pig-related zebra tests. (yanz)
 
 PIG-1662: Need better error message for MalFormedProbVecException (rding)

Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/PigServer.java Tue Oct  5 00:52:02 2010
@@ -1332,15 +1332,15 @@ public class PigServer {
 
             LogicalOptimizer optimizer = new LogicalOptimizer(lp, pigContext.getExecType(), optimizerRules);
             optimizer.optimize();
+            
+            // compute whether output data is sorted or not
+            SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
+            sortInfoSetter.visit();
+            
+            // run validations to be done after optimization
+            isBeforeOptimizer = false;
+            validate(lp, collector, isBeforeOptimizer);
         }
-
-        // compute whether output data is sorted or not
-        SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
-        sortInfoSetter.visit();
-        
-        // run validations to be done after optimization
-        isBeforeOptimizer = false;
-        validate(lp, collector, isBeforeOptimizer);
         
         return lp;
     }

Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Oct  5 00:52:02 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.SortInfo;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
@@ -61,8 +62,21 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
+import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 
@@ -261,6 +275,18 @@ public class HExecutionEngine {
                     new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(newPlan, 100, optimizerRules);
                 optimizer.optimize();
                 
+                // compute whether output data is sorted or not
+                SortInfoSetter sortInfoSetter = new SortInfoSetter(newPlan);
+                sortInfoSetter.visit();
+                
+                if (pigContext.inExplain==false) {
+                    // Validate input/output file. Currently no validation framework in
+                    // new logical plan, put this validator here first.
+                    // We might decide to move it out to a validator framework in future
+                    InputOutputFileValidator validator = new InputOutputFileValidator(newPlan, pigContext);
+                    validator.validate();
+                }
+                
                 // translate new logical plan to physical plan
                 org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor translator = 
                     new org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor(newPlan);
@@ -282,6 +308,56 @@ public class HExecutionEngine {
             throw new ExecException(msg, errCode, PigException.BUG, ve);
         }
     }
+    
+    public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
+
+        public SortInfoSetter(OperatorPlan plan) throws FrontendException {
+            super(plan, new DependencyOrderWalker(plan));
+        }
+
+        @Override
+        public void visit(LOStore store) throws FrontendException {
+            
+            Operator storePred = store.getPlan().getPredecessors(store).get(0);
+            if(storePred == null){
+                int errCode = 2051;
+                String msg = "Did not find a predecessor for Store." ;
+                throw new FrontendException(msg, errCode, PigException.BUG);    
+            }
+            
+            SortInfo sortInfo = null;
+            if(storePred instanceof LOLimit) {
+                storePred = store.getPlan().getPredecessors(storePred).get(0);
+            } else if (storePred instanceof LOSplitOutput) {
+                LOSplitOutput splitOutput = (LOSplitOutput)storePred;
+                // We assume this is the LOSplitOutput we injected for this case:
+                // b = order a by $0; store b into '1'; store b into '2';
+                // In this case, we should mark both '1' and '2' as sorted
+                LogicalExpressionPlan conditionPlan = splitOutput.getFilterPlan();
+                if (conditionPlan.getSinks().size()==1) {
+                    Operator root = conditionPlan.getSinks().get(0);
+                    if (root instanceof ConstantExpression) {
+                        Object value = ((ConstantExpression)root).getValue();
+                        if (value instanceof Boolean && (Boolean)value==true) {
+                            Operator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+                            if (split instanceof LOSplit)
+                                storePred = store.getPlan().getPredecessors(split).get(0);
+                        }
+                    }
+                }
+            }
+            // if this predecessor is a sort, get
+            // the sort info.
+            if(storePred instanceof LOSort) {
+                try {
+                    sortInfo = ((LOSort)storePred).getSortInfo();
+                } catch (FrontendException e) {
+                    throw new FrontendException(e);
+                }
+            }
+            store.setSortInfo(sortInfo);
+        }
+    }
 
     public List<ExecJob> execute(PhysicalPlan plan,
                                  String jobName) throws ExecException, FrontendException {

Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Tue Oct  5 00:52:02 2010
@@ -362,7 +362,6 @@ public class LogicalPlanMigrationVistor 
         newStore.setRequestedParallelism(store.getRequestedParallelism());
         newStore.setSignature(store.getSignature());
         newStore.setInputSpec(store.getInputSpec());
-        newStore.setSortInfo(store.getSortInfo());
         newStore.setTmpStore(store.isTmpStore());
         
         logicalPlan.add(newStore);

Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Tue Oct  5 00:52:02 2010
@@ -28,6 +28,7 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
 import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
 import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
+import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
 import org.apache.pig.newplan.logical.rules.LimitOptimizer;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.LogicalExpressionSimplifier;

Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOStore.java Tue Oct  5 00:52:02 2010
@@ -125,5 +125,6 @@ public class LOStore extends LogicalRela
     
     public void setSignature(String sig) {
         signature = sig;
+        storeFunc.setStoreFuncUDFContextSignature(signature);
     }
 }

Added: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1004490&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (added)
+++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Tue Oct  5 00:52:02 2010
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DepthFirstWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter.GroupAllParallelSetterTransformer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class InputOutputFileValidator {
+    private PigContext pigCtx;
+    OperatorPlan plan;
+    public InputOutputFileValidator(OperatorPlan plan, PigContext pigContext) {
+        pigCtx = pigContext;
+        this.plan = plan;
+    }
+
+    public void validate() throws FrontendException {
+        InputOutputFileVisitor visitor = new InputOutputFileVisitor(plan);
+        visitor.visit();
+    }
+    
+    class InputOutputFileVisitor extends LogicalRelationalNodesVisitor {
+
+        protected InputOutputFileVisitor(OperatorPlan plan)
+                throws FrontendException {
+            super(plan, new DepthFirstWalker(plan));
+        }
+
+        @Override
+        public void visit(LOStore store) throws FrontendException {
+            StoreFuncInterface sf = store.getStoreFunc();
+            String outLoc = store.getOutputSpec().getFileName();
+            int errCode = 2116;
+            String validationErrStr ="Output Location Validation Failed for: '" + outLoc ;
+            Job dummyJob;
+            
+            try {
+                if(store.getSchema() != null){
+                    sf.checkSchema(new ResourceSchema(Util.translateSchema(store.getSchema()), store.getSortInfo()));                
+                }
+                dummyJob = new Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));
+                sf.setStoreLocation(outLoc, dummyJob);
+            } catch (IOException ioe) {
+                if(ioe instanceof PigException){
+                    errCode = ((PigException)ioe).getErrorCode();
+                } 
+                String exceptionMsg = ioe.getMessage();
+                validationErrStr += (exceptionMsg == null) ? "" : " More info to follow:\n" +exceptionMsg;
+                throw new FrontendException(validationErrStr, errCode, pigCtx.getErrorSource(), ioe);
+            }
+            
+            validationErrStr += " More info to follow:\n";
+            try {
+                sf.getOutputFormat().checkOutputSpecs(dummyJob);
+            } catch (IOException ioe) {
+                byte errSrc = pigCtx.getErrorSource();
+                switch(errSrc) {
+                case PigException.BUG:
+                    errCode = 2002;
+                    break;
+                case PigException.REMOTE_ENVIRONMENT:
+                    errCode = 6000;
+                    break;
+                case PigException.USER_ENVIRONMENT:
+                    errCode = 4000;
+                    break;
+                }
+                validationErrStr  += ioe.getMessage();
+                throw new FrontendException(validationErrStr, errCode, errSrc, ioe);
+            } catch (InterruptedException ie) {
+                validationErrStr += ie.getMessage();
+                throw new FrontendException(validationErrStr, errCode, pigCtx.getErrorSource(), ie);
+            }
+        }
+    }
+}

Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=1004490&r1=1004489&r2=1004490&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestLogToPhyCompiler.java Tue Oct  5 00:52:02 2010
@@ -703,7 +703,6 @@ public class TestLogToPhyCompiler extend
     @Test
     public void testSortInfoMultipleStore() throws Exception {
         PigServer myPig = new PigServer(ExecType.LOCAL);
-        myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false");
         myPig.setBatchOn();
         myPig.registerQuery("a = load 'bla' as (i:int, n:chararray, d:double);");
         myPig.registerQuery("b = order a by i, d desc;");
@@ -719,7 +718,13 @@ public class TestLogToPhyCompiler extend
         LOPrinter lpr = new LOPrinter(System.err, lp);
         lpr.visit();
         
-        PhysicalPlan pp = buildPhysicalPlan(lp);
+        java.lang.reflect.Method compilePp = myPig.getClass()
+            .getDeclaredMethod("compilePp",
+            new Class[] { LogicalPlan.class });
+        
+        compilePp.setAccessible(true);
+        
+        PhysicalPlan pp = (PhysicalPlan) compilePp.invoke(myPig, new Object[] { lp });
         SortInfo si0 = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
         SortInfo si1 = ((POStore)(pp.getLeaves().get(1))).getSortInfo();
         SortInfo expected = getSortInfo(