You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/03/10 18:54:51 UTC

svn commit: r921483 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java test/org/apache/pig/test/TestLimitAdjuster.java

Author: rding
Date: Wed Mar 10 17:54:50 2010
New Revision: 921483

URL: http://svn.apache.org/viewvc?rev=921483&view=rev
Log:
PIG-1238: Dump does not respect the schema

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=921483&r1=921482&r2=921483&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Mar 10 17:54:50 2010
@@ -145,6 +145,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-1238: Dump does not respect the schema (rding)
+
 PIG-1261: PigStorageSchema broke after changes to ResourceSchema (dvryaboy via
 daijy)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=921483&r1=921482&r2=921483&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Mar 10 17:54:50 2010
@@ -38,22 +38,11 @@ import org.apache.pig.LoadFunc;
 import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
-import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.builtin.PoissonSampleLoader;
-import org.apache.pig.impl.builtin.GetMemNumRows;
-import org.apache.pig.impl.builtin.PartitionSkewedKeys;
-import org.apache.pig.impl.builtin.RandomSampleLoader;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -61,26 +50,38 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -94,7 +95,6 @@ import org.apache.pig.impl.util.Compiler
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 
 /**
  * The compiler that compiles a given physical plan
@@ -2414,7 +2414,7 @@ public class MRCompiler extends PhyPlanV
         {
             for (MapReduceOper mr:opsToAdjust)
             {
-                if (mr.reducePlan.isEmpty()) return;
+                if (mr.reducePlan.isEmpty()) continue;
                 List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
                 if (mpLeaves.size() != 1) {
                     int errCode = 2024; 
@@ -2442,28 +2442,33 @@ public class MRCompiler extends PhyPlanV
                 POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
                 pLimit.setLimit(mr.limit);
                 limitAdjustMROp.mapPlan.addAsLeaf(pLimit);
-                if (mr.isGlobalSort()) 
+                if (mr.isGlobalSort()) {
                     connectMapToReduceLimitedSort(limitAdjustMROp, mr);
-                else
+                } else {
                     simpleConnectMapToReduce(limitAdjustMROp);
+                }
                 POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
                 pLimit2.setLimit(mr.limit);
                 limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
-                POStore st = getStore();
-                st.setSFile(oldSpec);
-                st.setIsTmpStore(oldIsTmpStore);
-                limitAdjustMROp.reducePlan.addAsLeaf(st);
-                limitAdjustMROp.requestedParallelism = 1;
-                limitAdjustMROp.setLimitOnly(true);
+
                 // If the operator we're following has global sort set, we
                 // need to indicate that this is a limit after a sort.
                 // This will assure that we get the right sort comparator
                 // set.  Otherwise our order gets wacked (PIG-461).
                 if (mr.isGlobalSort()) 
                 {
+                    fixProjectionAfterLimit(limitAdjustMROp, mr);
                     limitAdjustMROp.setLimitAfterSort(true);
                     limitAdjustMROp.setSortOrder(mr.getSortOrder());
                 }
+                
+                POStore st = getStore();
+                st.setSFile(oldSpec);
+                st.setIsTmpStore(oldIsTmpStore);
+                limitAdjustMROp.reducePlan.addAsLeaf(st);
+                limitAdjustMROp.requestedParallelism = 1;
+                limitAdjustMROp.setLimitOnly(true);
+                
                 List<MapReduceOper> successorList = MRPlan.getSuccessors(mr);
                 MapReduceOper successors[] = null;
                 
@@ -2494,6 +2499,32 @@ public class MRCompiler extends PhyPlanV
                 }
             }
         }
+        
+        // Move all operators between POLimit and POStore in reducer plan 
+        // from sortMROp to the new MROp so that the sort keys aren't lost by 
+        // projection in sortMROp.
+        private void fixProjectionAfterLimit(MapReduceOper mro,
+                MapReduceOper sortMROp) throws PlanException, VisitorException {
+                        
+            PhysicalOperator op = sortMROp.reducePlan.getLeaves().get(0);
+            
+            while (true) {
+                List<PhysicalOperator> preds = sortMROp.reducePlan
+                        .getPredecessors(op);
+                op = preds.get(0); 
+                if (op instanceof POLimit) break;
+            }
+            
+            while (true) {
+                List<PhysicalOperator> succes = sortMROp.reducePlan
+                        .getSuccessors(op);
+                PhysicalOperator succ = succes.get(0);               
+                if (succ instanceof POStore) break;
+            
+                sortMROp.reducePlan.removeAndReconnect(succ);
+                mro.reducePlan.addAsLeaf(succ);
+            }
+        }
     }
 
     private static class FindKeyTypeVisitor extends PhyPlanVisitor {

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java?rev=921483&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java Wed Mar 10 17:54:50 2010
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLimitAdjuster {
+
+    private static final MiniCluster cluster = MiniCluster.buildCluster();
+
+    private PigServer pig;
+    
+    @Before
+    public void setUp() throws Exception {
+        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    @Test
+    public void simpleTest() throws Exception {
+        String INPUT_FILE = "input";
+        
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+        w.println("1\torange");
+        w.println("2\tapple");
+        w.println("3\tcoconut");
+        w.println("4\tmango");
+        w.println("5\tgrape");
+        w.println("6\tpear");
+        w.close();
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+        
+        pig.registerQuery("a = load '" + INPUT_FILE + "' as (x:int, y:chararray);");
+        pig.registerQuery("b = order a by x parallel 2;");
+        pig.registerQuery("c = limit b 1;");
+        pig.registerQuery("d = foreach c generate y;");
+        
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] { "('orange')" });
+        
+        Iterator<Tuple> iter = pig.openIterator("d");
+        int counter = 0;
+        while (iter.hasNext()) {
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+        }
+        assertEquals(expectedResults.size(), counter);
+    }
+
+}