You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/09/12 05:09:51 UTC

svn commit: r1383741 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/TOP.java test/org/apache/pig/test/TestBuiltin.java

Author: dvryaboy
Date: Wed Sep 12 03:09:50 2012
New Revision: 1383741

URL: http://svn.apache.org/viewvc?rev=1383741&view=rev
Log:
PIG-2915: Builtin TOP udf is sensitive to null input bags

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/TOP.java
    pig/trunk/test/org/apache/pig/test/TestBuiltin.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1383741&r1=1383740&r2=1383741&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 12 03:09:50 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2915: Builtin TOP udf is sensitive to null input bags (hazen via dvryaboy)
+
 PIG-2901: Errors and lacks in document "Pig Latin Basics" (miyakawataku via billgraham)
 
 PIG-2905: Improve documentation around REPLACE (cheolsoo via billgraham)

Modified: pig/trunk/src/org/apache/pig/builtin/TOP.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TOP.java?rev=1383741&r1=1383740&r2=1383741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TOP.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TOP.java Wed Sep 12 03:09:50 2012
@@ -111,6 +111,9 @@ public class TOP extends EvalFunc<DataBa
             int n = (Integer) tuple.get(0);
             int fieldNum = (Integer) tuple.get(1);
             DataBag inputBag = (DataBag) tuple.get(2);
+            if (inputBag == null) {
+                return null;
+            }
             PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
                     new TupleComparator(fieldNum));
             updateTop(store, n, inputBag);
@@ -205,6 +208,9 @@ public class TOP extends EvalFunc<DataBa
                 int n = (Integer) tuple.get(0);
                 int fieldNum = (Integer) tuple.get(1);
                 DataBag inputBag = (DataBag) tuple.get(2);
+                if (inputBag == null) {
+                    return null;
+                }
                 Tuple retTuple = mTupleFactory.newTuple(3);
                 DataBag outputBag = mBagFactory.newDefaultBag();
                 // initially, there should only be one, so not much point in doing the priority queue
@@ -249,25 +255,36 @@ public class TOP extends EvalFunc<DataBa
                 int n = (Integer) peekTuple.get(0);
                 int fieldNum = (Integer) peekTuple.get(1);
                 DataBag inputBag = (DataBag) peekTuple.get(2);
+                boolean allInputBagsNull = true;
 
                 PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
                         new TupleComparator(fieldNum));
 
-                updateTop(store, n, inputBag);
+                if (inputBag != null) {
+                    allInputBagsNull = false;
+                    updateTop(store, n, inputBag);
+                }
 
                 while (intermediateIterator.hasNext()) {
                     Tuple t = intermediateIterator.next();
                     if (t == null || t.size() < 3 ) continue;
-                    updateTop(store, n, (DataBag) t.get(2));
+                    inputBag = (DataBag) t.get(2);
+                    if (inputBag != null) {
+                        allInputBagsNull = false;
+                        updateTop(store, n, inputBag);
+                    }
                 }   
 
-                DataBag outputBag = mBagFactory.newDefaultBag();
-                for (Tuple t : store) {
-                    outputBag.add(t);
-                }
                 Tuple retTuple = mTupleFactory.newTuple(3);
                 retTuple.set(0, n);
                 retTuple.set(1,fieldNum);
+                DataBag outputBag = null;
+                if (!allInputBagsNull) {
+                    outputBag = mBagFactory.newDefaultBag();
+                    for (Tuple t : store) {
+                        outputBag.add(t);
+                    }
+                }
                 retTuple.set(2, outputBag);
                 if (log.isDebugEnabled()) { 
                     if (randomizer.nextInt(1000) == 1) log.debug("outputting "+retTuple.toDelimitedString("\t")); 
@@ -315,18 +332,30 @@ public class TOP extends EvalFunc<DataBa
                 int n = (Integer) peekTuple.get(0);
                 int fieldNum = (Integer) peekTuple.get(1);
                 DataBag inputBag = (DataBag) peekTuple.get(2);
+                boolean allInputBagsNull = true;
 
                 PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
                         new TupleComparator(fieldNum));
 
-                updateTop(store, n, inputBag);
+                if (inputBag != null) {
+                    allInputBagsNull = false;
+                    updateTop(store, n, inputBag);
+                }
 
                 while (intermediateIterator.hasNext()) {
                     Tuple t = intermediateIterator.next();
                     if (t == null || t.size() < 3 ) continue;
-                    updateTop(store, n, (DataBag) t.get(2));
+                    inputBag = (DataBag) t.get(2);
+                    if (inputBag != null) {
+                        allInputBagsNull = false;
+                        updateTop(store, n, inputBag);
+                    }
                 }   
 
+                if (allInputBagsNull) {
+                    return null;
+                }
+                
                 DataBag outputBag = mBagFactory.newDefaultBag();
                 for (Tuple t : store) {
                     outputBag.add(t);

Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1383741&r1=1383740&r2=1383741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Wed Sep 12 03:09:50 2012
@@ -89,7 +89,6 @@ import org.apache.pig.builtin.SubtractDu
 import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.builtin.TOKENIZE;
 import org.apache.pig.builtin.TOMAP;
-import org.apache.pig.builtin.TOP;
 import org.apache.pig.builtin.TOTUPLE;
 import org.apache.pig.builtin.TRIM;
 import org.apache.pig.builtin.TextLoader;
@@ -1976,53 +1975,7 @@ public class TestBuiltin {
 	assertEquals("", m.get("k2"), 2.0);
 	assertEquals("", m.get("k3"), "foo");
 
-
-        TOP top = new TOP();
-        TupleFactory tupleFactory = TupleFactory.getInstance();
-        BagFactory bagFactory = DefaultBagFactory.getInstance();
-        Tuple inputTuple = tupleFactory.newTuple(3);
-        DataBag dBag = bagFactory.newDefaultBag();
-
-        // set N = 10 i.e retain top 10 tuples
-        inputTuple.set(0, 10);
-        // compare tuples by field number 1
-        inputTuple.set(1, 1);
-        // set the data bag containing the tuples
-        inputTuple.set(2, dBag);
-
-        // generate tuples of the form (group-1, 1), (group-2, 2) ...
-        for (long i = 0; i < 100; i++) {
-            Tuple nestedTuple = tupleFactory.newTuple(2);
-            nestedTuple.set(0, "group-" + i);
-            nestedTuple.set(1, i);
-            dBag.add(nestedTuple);
-        }
-
-        DataBag outBag = top.exec(inputTuple);
-        assertEquals(outBag.size(), 10L);
-        checkItemsGT(outBag, 1, 89);
-
-        // two initial results
-        Tuple init1 = (new TOP.Initial()).exec(inputTuple);
-        Tuple init2 = (new TOP.Initial()).exec(inputTuple);
-        // two intermediate results
-
-        DataBag intermedBag = bagFactory.newDefaultBag();
-        intermedBag.add(init1);
-        intermedBag.add(init2);
-        Tuple intermedInput = tupleFactory.newTuple(intermedBag);
-        Tuple intermedOutput1 = (new TOP.Intermed()).exec(intermedInput);
-        Tuple intermedOutput2 = (new TOP.Intermed()).exec(intermedInput);
-        checkItemsGT((DataBag)intermedOutput1.get(2), 1, 94);
-
-        // final result
-        DataBag finalInputBag = bagFactory.newDefaultBag();
-        finalInputBag.add(intermedOutput1);
-        finalInputBag.add(intermedOutput2);
-        Tuple finalInput = tupleFactory.newTuple(finalInputBag);
-        outBag = (new TOP.Final()).exec(finalInput);
-        assertEquals(outBag.size(), 10L);
-        checkItemsGT(outBag, 1, 96);
+        // TOP - tests migrated to org.apache.pig.builtin.TestTop
     }
 
     @Test