You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/01/28 20:57:21 UTC

svn commit: r904241 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java test/org/apache/pig/test/TestLocalRearrange.java

Author: hashutosh
Date: Thu Jan 28 19:57:21 2010
New Revision: 904241

URL: http://svn.apache.org/viewvc?rev=904241&view=rev
Log: (empty)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=904241&r1=904240&r2=904241&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 28 19:57:21 2010
@@ -78,6 +78,8 @@
 
 BUG FIXES
 
+PIG-1194:  ERROR 2055: Received Error while processing the map plan (rding via ashutoshc)
+
 PIG-1204:  Pig hangs when joining two streaming relations in local mode
 (rding)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=904241&r1=904240&r2=904241&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jan 28 19:57:21 2010
@@ -57,7 +57,9 @@
 
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
-    transient private Log log = LogFactory.getLog(getClass());
+    private static Log log = LogFactory.getLog(POLocalRearrange.class);
+    
+    private static final Result ERR_RESULT = new Result();
 
     protected List<PhysicalPlan> plans;
     
@@ -251,7 +253,7 @@
     public Result getNext(Tuple t) throws ExecException {
         
         Result inp = null;
-        Result res = null;
+        Result res = ERR_RESULT;
         while (true) {
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
@@ -308,9 +310,16 @@
                 case DataType.TUPLE:
                     res = op.getNext(dummyTuple);
                     break;
+                default:
+                    log.error("Invalid result type: " + DataType.findType(op.getResultType()));
+                    break;
                 }
-                if(res.returnStatus!=POStatus.STATUS_OK)
+                
+                // allow null as group by key
+                if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
                     return new Result();
+                }
+              
                 resLst.add(res);
             }
             
@@ -349,15 +358,24 @@
                     case DataType.TUPLE:
                         res = op.getNext(dummyTuple);
                         break;
+                    default:
+                        log.error("Invalid result type: " + DataType.findType(op.getResultType()));
+                        break;
                     }
-                    if(res.returnStatus!=POStatus.STATUS_OK)
+                    
+                    // allow null as group by key
+                    if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
                         return new Result();
+                    }
+                    
                     secondaryResLst.add(res);
                 }
             }
+            
             // If we are using secondary sort key, our new key is:
-            // (nullable, index, (key, secondary key), value)
-            res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
+            // (nullable, index, (key, secondary key), value)             
+            res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);            
+            res.returnStatus = POStatus.STATUS_OK;
             
             return res;
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java?rev=904241&r1=904240&r2=904241&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalRearrange.java Thu Jan 28 19:57:21 2010
@@ -17,11 +17,21 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTuple;
@@ -173,4 +183,52 @@
         assertEquals(db.size(), size);
     }
 
+    @Test
+    public void testMultiQueryJiraPig1194() {
+
+        // test case: POLocalRearrange doesn't handle nulls returned by POBinCond 
+        
+        String INPUT_FILE = "data.txt";
+        
+        final MiniCluster cluster = MiniCluster.buildCluster();
+        
+        try {
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("10\t2\t3");
+            w.println("20\t3\t");
+            w.close();
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            PigServer myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+            myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, a2);");
+            myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) a1) > .001 OR a0 < 11 ? a0 : -1);");
+            
+            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(10,{(10,2,3)})",
+                            "(null,{(20,3,null)})"
+                    });
+            
+            Iterator<Tuple> iter = myPig.openIterator("grp");
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            }
+            assertEquals(expectedResults.size(), counter);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    
 }