You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/01/20 01:53:06 UTC

svn commit: r901019 - in /hadoop/pig/branches/branch-0.6: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java test/org/apache/pig/test/TestSecondarySort.java

Author: daijy
Date: Wed Jan 20 00:53:05 2010
New Revision: 901019

URL: http://svn.apache.org/viewvc?rev=901019&view=rev
Log:
PIG-1193: Secondary sort issue on nested desc sort

Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
    hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=901019&r1=901018&r2=901019&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Jan 20 00:53:05 2010
@@ -270,6 +270,8 @@
 
 PIG-1186: Pig do not take values in "pig-cluster-hadoop-site.xml" (daijy)
 
+PIG-1193: Secondary sort issue on nested desc sort (daijy)
+
 Release 0.5.0
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=901019&r1=901018&r2=901019&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Wed Jan 20 00:53:05 2010
@@ -29,7 +29,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -43,7 +42,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -245,17 +244,22 @@
         for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
             // visit inner plans to figure out the sort order for distinct /
             // sort
-            SecondaryKeyDiscoverVisitor innerPlanVisitor = new SecondaryKeyDiscoverVisitor(
+            SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
                     innerPlan, sortKeyInfos, secondarySortKeyInfo);
-            innerPlanVisitor.visit();
-            secondarySortKeyInfo = innerPlanVisitor.getSecondarySortKeyInfo();
-            if (innerPlanVisitor.getSortsToRemove() != null) {
-                for (POSort sort : innerPlanVisitor.getSortsToRemove()) {
+            try {
+                innerPlanDiscover.process();
+            } catch (FrontendException e) {
+                int errorCode = 2213;
+                throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
+            }
+            secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
+            if (innerPlanDiscover.getSortsToRemove() != null) {
+                for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
                     sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
                 }
             }
-            if (innerPlanVisitor.getDistinctsToChange() != null) {
-                for (PODistinct distinct : innerPlanVisitor
+            if (innerPlanDiscover.getDistinctsToChange() != null) {
+                for (PODistinct distinct : innerPlanDiscover
                         .getDistinctsToChange()) {
                     distinctsToChange.add(new POToChange(distinct, innerPlan,
                             foreach));
@@ -289,10 +293,14 @@
                 String scope = oldSort.getOperatorKey().scope;
                 List<PhysicalOperator> preds = sortToRemove.plan
                         .getPredecessors(sortToRemove.oper);
+                List<PhysicalOperator> succs = sortToRemove.plan
+                .getSuccessors(sortToRemove.oper);
                 POProject project = null;
-                if (preds == null
+                if ((preds == null
                         || preds.get(0).getResultType() != DataType.BAG
-                        && oldSort.getResultType() == DataType.BAG) {
+                        && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
+                        && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
+                {
                     project = new PORelationToExprProject(new OperatorKey(
                             scope, NodeIdGenerator.getGenerator()
                                     .getNextNodeId(scope)), oldSort
@@ -351,7 +359,7 @@
                 }
                 if (!found)
                 {
-                    int errorCode = 2209;
+                    int errorCode = 2214;
                     new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
                 }
             }
@@ -429,7 +437,9 @@
     // we cannot do any secondary key optimization because we only have 1
     // secondary
     // sort key.
-    private static class SecondaryKeyDiscoverVisitor extends PhyPlanVisitor {
+    private static class SecondaryKeyDiscover {
+        PhysicalPlan mPlan;
+        
         List<POSort> sortsToRemove = new ArrayList<POSort>();
 
         List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
@@ -438,25 +448,60 @@
 
         SortKeyInfo secondarySortKeyInfo;
 
-        ColumnChainInfo columnChainInfo = new ColumnChainInfo();
-
-        boolean sawInvalidPhysicalOper = false;
+        ColumnChainInfo columnChainInfo = null;
 
         // PhysicalPlan here is foreach inner plan
-        SecondaryKeyDiscoverVisitor(PhysicalPlan plan,
+        SecondaryKeyDiscover(PhysicalPlan plan,
                 List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
-            super(plan,
-                    new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
-                            plan));
+            this.mPlan = plan;
             this.sortKeyInfos = sortKeyInfos;
             this.secondarySortKeyInfo = secondarySortKeyInfo;
         }
+        
+        public void process() throws FrontendException
+        {
+            List<PhysicalOperator> roots = mPlan.getRoots();
+            for (PhysicalOperator root : roots) {
+                columnChainInfo = new ColumnChainInfo();
+                processRoot(root);
+            }
+        }
+        
+        public void processRoot(PhysicalOperator root) throws FrontendException {
+            PhysicalOperator currentNode = root;
+            while (currentNode!=null) {
+                boolean sawInvalidPhysicalOper = false;
+                if (currentNode instanceof PODistinct)
+                    sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
+                else if (currentNode instanceof POSort)
+                    sawInvalidPhysicalOper = processSort((POSort)currentNode);
+                else if (currentNode instanceof POProject)
+                    sawInvalidPhysicalOper = processProject((POProject)currentNode);
+                else if (currentNode instanceof POForEach)
+                    sawInvalidPhysicalOper = processForEach((POForEach)currentNode);
+                else if (currentNode instanceof POUserFunc ||
+                         currentNode instanceof POUnion)
+                    break;
+                
+                if (sawInvalidPhysicalOper)
+                    break;
+                
+                List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
+                if (succs==null)
+                    currentNode = null;
+                else {
+                    if (succs.size()>1) {
+                        int errorCode = 2215;
+                        throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
+                                errorCode);
+                    }
+                    currentNode = succs.get(0);
+                }
+            }
+        }
 
         // We see PODistinct, check which key it is using
-        @Override
-        public void visitDistinct(PODistinct distinct) throws VisitorException {
-            if (sawInvalidPhysicalOper)
-                return;
+        public boolean processDistinct(PODistinct distinct) throws FrontendException {
             SortKeyInfo keyInfos = new SortKeyInfo();
             try {
                 keyInfos.insertColumnChainInfo(0,
@@ -469,7 +514,7 @@
             for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                 if (sortKeyInfo.moreSpecificThan(keyInfos)) {
                     distinctsToChange.add(distinct);
-                    return;
+                    return false;
                 }
             }
 
@@ -477,7 +522,7 @@
             if (secondarySortKeyInfo != null
                     && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
                 distinctsToChange.add(distinct);
-                return;
+                return false;
             }
 
             // Now set the secondary key
@@ -485,40 +530,22 @@
                 distinctsToChange.add(distinct);
                 secondarySortKeyInfo = keyInfos;
             }
-        }
-
-        @Override
-        public void visitLimit(POLimit limit) throws VisitorException {
+            return false;
         }
 
         // Accumulate column info
-        @Override
-        public void visitProject(POProject project) throws VisitorException {
+        public boolean processProject(POProject project) throws FrontendException {
             columnChainInfo.insertInReduce(project.isStar(), project
                     .getColumns(), project.getResultType());
-        }
-
-        @Override
-        public void visitFilter(POFilter filter) throws VisitorException {
-        }
-
-        @Override
-        public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
-            sawInvalidPhysicalOper = true;
-        }
-        
-        @Override
-        public void visitUnion(POUnion union) throws VisitorException {
-            sawInvalidPhysicalOper = true;
+            return false;
         }
 
         // Accumulate column info from nested project
-        @Override
-        public void visitPOForEach(POForEach fe) throws VisitorException {
+        public boolean processForEach(POForEach fe) throws FrontendException {
             if (fe.getInputPlans().size() > 1) {
                 // I may be wrong, but for now let's assume foreach plan before
                 // sort/distinct only have one foreach plan
-                throw new VisitorException(
+                throw new FrontendException(
                         "POForEach has more than 1 input plans");
             }
             boolean r = false;
@@ -527,19 +554,15 @@
                         columnChainInfo);
             } catch (PlanException e) {
                 int errorCode = 2205;
-                throw new VisitorException("Error visiting POForEach inner plan",
+                throw new FrontendException("Error visiting POForEach inner plan",
                         errorCode, e);
             }
             // See something other than POProject in POForEach, set the flag to stop further processing
-            if (r)
-                sawInvalidPhysicalOper = true;
+            return r;
         }
 
         // We see POSort, check which key it is using
-        @Override
-        public void visitSort(POSort sort) throws VisitorException {
-            if (sawInvalidPhysicalOper)
-                return;
+        public boolean processSort(POSort sort) throws FrontendException{
             SortKeyInfo keyInfo = new SortKeyInfo();
             for (int i = 0; i < sort.getSortPlans().size(); i++) {
                 PhysicalPlan sortPlan = sort.getSortPlans().get(i);
@@ -555,13 +578,12 @@
                     r = collectColumnChain(sortPlan, sortChainInfo);
                 } catch (PlanException e) {
                     int errorCode = 2206;
-                    throw new VisitorException("Error visiting POSort inner plan",
+                    throw new FrontendException("Error visiting POSort inner plan",
                             errorCode, e);
                 }
-                if (r) // if we saw physical operator other than project in sort plan
+                if (r==true) // if we saw physical operator other than project in sort plan
                 {
-                    sawInvalidPhysicalOper = true;
-                    return;
+                    return true;
                 }
                 keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
                         .getMAscCols().get(i));
@@ -570,14 +592,14 @@
             for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                 if (sortKeyInfo.moreSpecificThan(keyInfo)) {
                     sortsToRemove.add(sort);
-                    return;
+                    return false;
                 }
             }
             // if it is part of secondary key
             if (secondarySortKeyInfo != null
                     && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
                 sortsToRemove.add(sort);
-                return;
+                return false;
             }
 
             // Now set the secondary key
@@ -585,6 +607,7 @@
                 sortsToRemove.add(sort);
                 secondarySortKeyInfo = keyInfo;
             }
+            return false;
         }
 
         public List<POSort> getSortsToRemove() {

Modified: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java?rev=901019&r1=901018&r2=901019&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java Wed Jan 20 00:53:05 2010
@@ -366,6 +366,26 @@
         assertTrue(so.getDistinctChanged()==0);
     }
     
+    // See PIG-1193
+    public void testSortOptimization8() throws Exception{
+        // Sort desc, used in UDF twice
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0);");
+        planTester.buildPlan("B = group A all;");
+        planTester.buildPlan("C = foreach B { D = order A by $0 desc; generate DIFF(D, D);};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==2);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
     public void testNestedDistinctEndToEnd1() throws Exception{
         File tmpFile1 = File.createTempFile("test", "txt");
         PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));