You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/03/10 18:54:51 UTC
svn commit: r921483 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
test/org/apache/pig/test/TestLimitAdjuster.java
Author: rding
Date: Wed Mar 10 17:54:50 2010
New Revision: 921483
URL: http://svn.apache.org/viewvc?rev=921483&view=rev
Log:
PIG-1238: Dump does not respect the schema
Added:
hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=921483&r1=921482&r2=921483&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Mar 10 17:54:50 2010
@@ -145,6 +145,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-1238: Dump does not respect the schema (rding)
+
PIG-1261: PigStorageSchema broke after changes to ResourceSchema (dvryaboy via
daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=921483&r1=921482&r2=921483&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Mar 10 17:54:50 2010
@@ -38,22 +38,11 @@ import org.apache.pig.LoadFunc;
import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
-import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.builtin.PoissonSampleLoader;
-import org.apache.pig.impl.builtin.GetMemNumRows;
-import org.apache.pig.impl.builtin.PartitionSkewedKeys;
-import org.apache.pig.impl.builtin.RandomSampleLoader;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -61,26 +50,38 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -94,7 +95,6 @@ import org.apache.pig.impl.util.Compiler
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
/**
* The compiler that compiles a given physical plan
@@ -2414,7 +2414,7 @@ public class MRCompiler extends PhyPlanV
{
for (MapReduceOper mr:opsToAdjust)
{
- if (mr.reducePlan.isEmpty()) return;
+ if (mr.reducePlan.isEmpty()) continue;
List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
if (mpLeaves.size() != 1) {
int errCode = 2024;
@@ -2442,28 +2442,33 @@ public class MRCompiler extends PhyPlanV
POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit.setLimit(mr.limit);
limitAdjustMROp.mapPlan.addAsLeaf(pLimit);
- if (mr.isGlobalSort())
+ if (mr.isGlobalSort()) {
connectMapToReduceLimitedSort(limitAdjustMROp, mr);
- else
+ } else {
simpleConnectMapToReduce(limitAdjustMROp);
+ }
POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit2.setLimit(mr.limit);
limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
- POStore st = getStore();
- st.setSFile(oldSpec);
- st.setIsTmpStore(oldIsTmpStore);
- limitAdjustMROp.reducePlan.addAsLeaf(st);
- limitAdjustMROp.requestedParallelism = 1;
- limitAdjustMROp.setLimitOnly(true);
+
// If the operator we're following has global sort set, we
// need to indicate that this is a limit after a sort.
// This will assure that we get the right sort comparator
// set. Otherwise our order gets wacked (PIG-461).
if (mr.isGlobalSort())
{
+ fixProjectionAfterLimit(limitAdjustMROp, mr);
limitAdjustMROp.setLimitAfterSort(true);
limitAdjustMROp.setSortOrder(mr.getSortOrder());
}
+
+ POStore st = getStore();
+ st.setSFile(oldSpec);
+ st.setIsTmpStore(oldIsTmpStore);
+ limitAdjustMROp.reducePlan.addAsLeaf(st);
+ limitAdjustMROp.requestedParallelism = 1;
+ limitAdjustMROp.setLimitOnly(true);
+
List<MapReduceOper> successorList = MRPlan.getSuccessors(mr);
MapReduceOper successors[] = null;
@@ -2494,6 +2499,32 @@ public class MRCompiler extends PhyPlanV
}
}
}
+
+ // Move all operators between POLimit and POStore in reducer plan
+ // from sortMROp to the new MROp so that the sort keys aren't lost by
+ // projection in sortMROp.
+ private void fixProjectionAfterLimit(MapReduceOper mro,
+ MapReduceOper sortMROp) throws PlanException, VisitorException {
+
+ PhysicalOperator op = sortMROp.reducePlan.getLeaves().get(0);
+
+ while (true) {
+ List<PhysicalOperator> preds = sortMROp.reducePlan
+ .getPredecessors(op);
+ op = preds.get(0);
+ if (op instanceof POLimit) break;
+ }
+
+ while (true) {
+ List<PhysicalOperator> succes = sortMROp.reducePlan
+ .getSuccessors(op);
+ PhysicalOperator succ = succes.get(0);
+ if (succ instanceof POStore) break;
+
+ sortMROp.reducePlan.removeAndReconnect(succ);
+ mro.reducePlan.addAsLeaf(succ);
+ }
+ }
}
private static class FindKeyTypeVisitor extends PhyPlanVisitor {
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java?rev=921483&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java Wed Mar 10 17:54:50 2010
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLimitAdjuster {
+
+ private static final MiniCluster cluster = MiniCluster.buildCluster();
+
+ private PigServer pig;
+
+ @Before
+ public void setUp() throws Exception {
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void simpleTest() throws Exception {
+ String INPUT_FILE = "input";
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("1\torange");
+ w.println("2\tapple");
+ w.println("3\tcoconut");
+ w.println("4\tmango");
+ w.println("5\tgrape");
+ w.println("6\tpear");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ pig.registerQuery("a = load '" + INPUT_FILE + "' as (x:int, y:chararray);");
+ pig.registerQuery("b = order a by x parallel 2;");
+ pig.registerQuery("c = limit b 1;");
+ pig.registerQuery("d = foreach c generate y;");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] { "('orange')" });
+
+ Iterator<Tuple> iter = pig.openIterator("d");
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+ assertEquals(expectedResults.size(), counter);
+ }
+
+}