You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/02/12 23:53:54 UTC
svn commit: r743915 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
test/org/apache/pig/test/
Author: pradeepkth
Date: Thu Feb 12 22:53:53 2009
New Revision: 743915
URL: http://svn.apache.org/viewvc?rev=743915&view=rev
Log:
PIG-665: Map key type not correctly set (for use when key is null) when map plan does not have localrearrange
Added:
hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
hadoop/pig/trunk/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Feb 12 22:53:53 2009
@@ -416,3 +416,6 @@
PIG-574: allowing to run scripts from within grunt shell (hagleitn via
olgan)
+
+ PIG-665: Map key type not correctly set (for use when key is null) when
+ map plan does not have localrearrange (pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java Thu Feb 12 22:53:53 2009
@@ -20,15 +20,18 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -51,32 +54,66 @@
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
+ boolean foundKeyType = false;
PhyPlanKeyTypeVisitor kvisitor = new PhyPlanKeyTypeVisitor(mr.mapPlan, mr);
kvisitor.visit();
+ if(!kvisitor.foundKeyType) {
+ // look for the key type from a POLocalRearrange in the previous reduce
+ List<MapReduceOper> preds = mPlan.getPredecessors(mr);
+ // if there are no predecessors, then we probably are in a
+ // simple load-store script - there is no way to figure out
+ // the key type in this case which probably means we don't need
+ // to figure it out
+ if(preds != null) {
+ Map<Byte, Integer> seen = new HashMap<Byte, Integer>();
+ for (MapReduceOper pred : preds) {
+ PhyPlanKeyTypeVisitor visitor = new PhyPlanKeyTypeVisitor(pred.reducePlan, mr);
+ visitor.visit();
+ foundKeyType |= visitor.foundKeyType;
+ byte type = mr.mapKeyType;
+ seen.put(type, 1);
+ }
+ if(seen.size() > 1) {
+ // throw exception since we should get the same key type from all predecessors
+ int errorCode = 2119;
+ String message = "Internal Error: Found multiple data types for map key";
+ throw new VisitorException(message, errorCode, PigException.BUG);
+ }
+ // if we were not able to find the key and
+ // if there is a map and reduce phase, then the
+ // map would need to send a valid key object and this
+ // can be an issue when the key is null - so error out here!
+ // if the reduce phase is empty, then this is a map only job
+ // which will not need a key object -
+ // IMPORTANT NOTE: THIS RELIES ON THE FACT THAT CURRENTLY
+ // IN PigMapOnly.collect(), null IS SENT IN THE collect() CALL
+ // FOR THE KEY - IF THAT CHANGES, THEN THIS CODE HERE MAY NEED TO
+ // CHANGE!
+ if(!foundKeyType && !mr.reducePlan.isEmpty()) {
+ // throw exception since we were not able to determine key type!
+ int errorCode = 2120;
+ String message = "Internal Error: Unable to determine data type for map key";
+ throw new VisitorException(message, errorCode, PigException.BUG);
+ }
+ }
+ }
}
class PhyPlanKeyTypeVisitor extends PhyPlanVisitor {
private MapReduceOper mro;
+ private boolean foundKeyType = false;
public PhyPlanKeyTypeVisitor(PhysicalPlan plan, MapReduceOper mro) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.mro = mro;
}
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage)
- */
- @Override
- public void visitPackage(POPackage pkg) throws VisitorException {
- this.mro.mapKeyType = pkg.getKeyType();
- }
-
-
@Override
public void visitLocalRearrange(POLocalRearrange lr)
throws VisitorException {
- this.mro.mapKeyType = lr.getKeyType();
+ this.mro.mapKeyType = lr.getKeyType();
+ foundKeyType = true;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Thu Feb 12 22:53:53 2009
@@ -85,7 +85,12 @@
@Override
public String name() {
- return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
+ return "POCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitCombinerPackage(this);
}
/**
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java?rev=743915&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java Thu Feb 12 22:53:53 2009
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * This testcases here test that the key type of the map key
+ * is correctly determines for use when the key is null. In
+ * particular it tests KeyTypeDiscoveryVisitor
+ */
+public class TestKeyTypeDiscoveryVisitor extends TestCase {
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
+
+ TupleFactory mTf = TupleFactory.getInstance();
+ BagFactory mBf = BagFactory.getInstance();
+
+ @Before
+ public void setUp() throws Exception{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ @Test
+ public void testNullJoin() throws Exception {
+ String[] inputData = new String[] { "\t7\t8", "\t8\t9", "1\t20\t30", "1\t20\t40" };
+ Util.createInputFile(cluster, "a.txt", inputData);
+
+ inputData = new String[] { "7\t2", "1\t5", "1\t10" };
+ Util.createInputFile(cluster, "b.txt", inputData);
+
+ String script = "a = load 'a.txt' as (x:int, y:int, z:int);" +
+ "b = load 'b.txt' as (x:int, y:int);" +
+ "b_group = group b by x;" +
+ "b_sum = foreach b_group generate flatten(group) as x, SUM(b.y) as clicks;" +
+ // b_sum will have {(1, 15L)}
+ "a_group = group a by (x, y);" +
+ "a_aggs = foreach a_group generate flatten(group) as (x, y), SUM(a.z) as zs;" +
+ // a_aggs will have {(<null>, 7, 8L), (<null>, 8, 9L), (1, 20, 70L)
+ // The join in the next statement is on "x" which is the first column
+ // The nulls in the first two records of a_aggs will test whether
+ // KeyTypeDiscoveryVisitor had set a valid keyType (in this case INTEGER)
+ // The null records will get discarded by the join and hence the join
+ // output would be {(1, 15L, 1, 20, 70L)}
+ "join_a_b = join b_sum by x, a_aggs by x;";
+ Util.registerQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("join_a_b");
+ Object[] results = new Object[] { 1, 15L, 1, 20, 70L};
+ Tuple output = it.next();
+ assertFalse(it.hasNext());
+ assertEquals(results.length, output.size());
+ for (int i = 0; i < output.size(); i++) {
+ assertEquals(results[i], output.get(i));
+ }
+
+ }
+
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Thu Feb 12 22:53:53 2009
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -333,4 +334,11 @@
comp.compile();
return comp.getMRPlan();
}
+
+ public static void registerQuery(PigServer pigServer, String query) throws IOException {
+ String[] queryLines = query.split(";");
+ for (String line : queryLines) {
+ pigServer.registerQuery(line + ";");
+ }
+ }
}