You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/02/12 23:53:54 UTC

svn commit: r743915 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ test/org/apache/pig/test/

Author: pradeepkth
Date: Thu Feb 12 22:53:53 2009
New Revision: 743915

URL: http://svn.apache.org/viewvc?rev=743915&view=rev
Log:
PIG-665: Map key type not correctly set (for use when key is null) when map plan does not have localrearrange

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Feb 12 22:53:53 2009
@@ -416,3 +416,6 @@
 
     PIG-574: allowing to run scripts from within grunt shell (hagleitn via
     olgan)
+
+    PIG-665: Map key type not correctly set (for use when key is null) when
+    map plan does not have localrearrange (pradeepkth)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/KeyTypeDiscoveryVisitor.java Thu Feb 12 22:53:53 2009
@@ -20,15 +20,18 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -51,32 +54,66 @@
     
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
+        boolean foundKeyType = false;
         PhyPlanKeyTypeVisitor kvisitor = new PhyPlanKeyTypeVisitor(mr.mapPlan, mr);
         kvisitor.visit();
+        if(!kvisitor.foundKeyType) {
+            // look for the key type from a POLocalRearrange in the previous reduce
+            List<MapReduceOper> preds = mPlan.getPredecessors(mr);
+            // if there are no predecessors, then we probably are in a 
+            // simple load-store script - there is no way to figure out
+            // the key type in this case which probably means we don't need
+            // to figure it out
+            if(preds != null) {
+                Map<Byte, Integer> seen = new HashMap<Byte, Integer>();
+                for (MapReduceOper pred : preds) {
+                    PhyPlanKeyTypeVisitor visitor = new PhyPlanKeyTypeVisitor(pred.reducePlan, mr);
+                    visitor.visit();
+                    foundKeyType |= visitor.foundKeyType;
+                    byte type = mr.mapKeyType;
+                    seen.put(type, 1);
+                }
+                if(seen.size() > 1) {
+                    // throw exception since we should get the same key type from all predecessors
+                    int errorCode = 2119;
+                    String message = "Internal Error: Found multiple data types for map key";
+                    throw new VisitorException(message, errorCode, PigException.BUG);
+                }
+                // if we were not able to find the key and 
+                // if there is a map and reduce phase, then the
+                // map would need to send a valid key object and this
+                // can be an issue when the key is null - so error out here!
+                // if the reduce phase is empty, then this is a map only job
+                // which will not need a key object -
+                // IMPORTANT NOTE: THIS RELIES ON THE FACT THAT CURRENTLY
+                // IN PigMapOnly.collect(), null IS SENT IN THE collect() CALL
+                // FOR THE KEY - IF THAT CHANGES, THEN THIS CODE HERE MAY NEED TO
+                // CHANGE!
+                if(!foundKeyType && !mr.reducePlan.isEmpty()) {
+                    // throw exception since we were not able to determine key type!
+                    int errorCode = 2120;
+                    String message = "Internal Error: Unable to determine data type for map key";
+                    throw new VisitorException(message, errorCode, PigException.BUG);
+                }
+            }
+        }
     }
     
     class PhyPlanKeyTypeVisitor extends PhyPlanVisitor {
         
         private MapReduceOper mro;
+        private boolean foundKeyType = false;
         
         public PhyPlanKeyTypeVisitor(PhysicalPlan plan, MapReduceOper mro) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
             this.mro = mro;
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage)
-         */
-        @Override
-        public void visitPackage(POPackage pkg) throws VisitorException {
-            this.mro.mapKeyType = pkg.getKeyType();        
-        }
-    
-    
         @Override
         public void visitLocalRearrange(POLocalRearrange lr)
                 throws VisitorException {
-            this.mro.mapKeyType = lr.getKeyType();        
+            this.mro.mapKeyType = lr.getKeyType();
+            foundKeyType = true;
         }
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Thu Feb 12 22:53:53 2009
@@ -85,7 +85,12 @@
 
     @Override
     public String name() {
-        return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
+        return "POCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
+    }
+    
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitCombinerPackage(this);
     }
     
     /**

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java?rev=743915&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestKeyTypeDiscoveryVisitor.java Thu Feb 12 22:53:53 2009
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * This testcases here test that the key type of the map key
+ * is correctly determines for use when the key is null. In 
+ * particular it tests KeyTypeDiscoveryVisitor
+ */
+public class TestKeyTypeDiscoveryVisitor extends TestCase {
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+
+    TupleFactory mTf = TupleFactory.getInstance();
+    BagFactory mBf = BagFactory.getInstance();
+    
+    @Before
+    public void setUp() throws Exception{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    @Test
+    public void testNullJoin() throws Exception {
+        String[] inputData = new String[] { "\t7\t8", "\t8\t9", "1\t20\t30", "1\t20\t40" };
+        Util.createInputFile(cluster, "a.txt", inputData);
+        
+        inputData = new String[] { "7\t2", "1\t5", "1\t10" };
+        Util.createInputFile(cluster, "b.txt", inputData);
+        
+        String script = "a = load 'a.txt' as (x:int, y:int, z:int);" +
+        		"b = load 'b.txt' as (x:int, y:int);" +
+        		"b_group = group b by x;" +
+        		"b_sum = foreach b_group generate flatten(group) as x, SUM(b.y) as clicks;" +
+        		// b_sum will have {(1, 15L)}
+        		"a_group = group a by (x, y);" +
+        		"a_aggs = foreach a_group generate flatten(group) as (x, y), SUM(a.z) as zs;" +
+        		// a_aggs will have {(<null>, 7, 8L), (<null>, 8, 9L), (1, 20, 70L)
+        		// The join in the next statement is on "x" which is the first column
+        		// The nulls in the first two records of a_aggs will test whether
+        		// KeyTypeDiscoveryVisitor had set a valid keyType (in this case INTEGER)
+        		// The null records will get discarded by the join and hence the join
+        		// output would be {(1, 15L, 1, 20, 70L)} 
+        		"join_a_b = join b_sum by x, a_aggs by x;";
+        Util.registerQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("join_a_b");
+        Object[] results = new Object[] { 1, 15L, 1, 20, 70L};
+        Tuple output = it.next();
+        assertFalse(it.hasNext());
+        assertEquals(results.length, output.size());
+        for (int i = 0; i < output.size(); i++) {
+            assertEquals(results[i], output.get(i));
+        }
+            
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=743915&r1=743914&r2=743915&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Thu Feb 12 22:53:53 2009
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -333,4 +334,11 @@
         comp.compile();
         return comp.getMRPlan();	
     }
+    
+    public static void registerQuery(PigServer pigServer, String query) throws IOException {
+        String[] queryLines = query.split(";");
+        for (String line : queryLines) {
+            pigServer.registerQuery(line + ";");
+        }
+    }
 }