You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2016/02/16 20:32:57 UTC

[1/2] incubator-datafu git commit: DATAFU-114: Make FirstTupleFromBag implement Accumulator

Repository: incubator-datafu
Updated Branches:
  refs/heads/master 92a56e49f -> a15a15c58


DATAFU-114: Make FirstTupleFromBag implement Accumulator


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/f7c9b232
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/f7c9b232
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/f7c9b232

Branch: refs/heads/master
Commit: f7c9b232fe1f7a9a2c445f7c25bb73a3c6a099c1
Parents: 92a56e4
Author: Eyal Allweil <ey...@yahoo.com>
Authored: Thu Feb 4 16:03:51 2016 -0800
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Thu Feb 4 16:04:05 2016 -0800

----------------------------------------------------------------------
 .../java/datafu/pig/bags/FirstTupleFromBag.java | 37 +++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/f7c9b232/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
index 1f24984..01b0d61 100644
--- a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
+++ b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
@@ -21,11 +21,13 @@ package datafu.pig.bags;
 
 import java.io.IOException;
 
-import datafu.pig.util.SimpleEvalFunc;
+import org.apache.pig.Accumulator;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
+import datafu.pig.util.SimpleEvalFunc;
+
 /**
  * Returns the first tuple from a bag. Requires a second parameter that will be returned if the bag is empty.
  *
@@ -46,8 +48,36 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
  * </pre>
  */
 
-public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
+public class FirstTupleFromBag extends SimpleEvalFunc<Tuple> implements Accumulator<Tuple>
 {
+  private Tuple result = null;
+  private boolean found = false;
+
+  @Override
+  public void accumulate(Tuple tuple) throws IOException
+  {
+    if (found == false) {      
+      DataBag bag = (DataBag) tuple.get(0);
+      Tuple defaultValue = (Tuple) tuple.get(1);
+
+      result = call(bag, defaultValue);
+      found = true;
+    }
+  }
+
+  @Override
+  public void cleanup()
+  {
+    found = false;
+    result = null;
+  }
+
+  @Override
+  public Tuple getValue()
+  {
+    return result;
+  }
+    
   public Tuple call(DataBag bag, Tuple defaultValue) throws IOException
   {
     for (Tuple t : bag) {
@@ -66,5 +96,4 @@ public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
       return null;
     }
   }
-}
-
+}
\ No newline at end of file


[2/2] incubator-datafu git commit: DATAFU-114: Add accumulator test for FirstTupleFromBag

Posted by mh...@apache.org.
DATAFU-114: Add accumulator test for FirstTupleFromBag


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/a15a15c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/a15a15c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/a15a15c5

Branch: refs/heads/master
Commit: a15a15c58164cc39c62235c0da6a7b1d8d7db9e2
Parents: f7c9b23
Author: Matthew Hayes <ma...@gmail.com>
Authored: Thu Feb 4 16:05:02 2016 -0800
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Thu Feb 4 16:05:02 2016 -0800

----------------------------------------------------------------------
 .../java/datafu/test/pig/bags/BagTests.java     | 30 ++++++++++++++++++++
 1 file changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a15a15c5/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
index a5851e4..9fdb250 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
@@ -22,6 +22,7 @@ package datafu.test.pig.bags;
 import datafu.pig.bags.CountEach;
 import datafu.pig.bags.DistinctBy;
 import datafu.pig.bags.Enumerate;
+import datafu.pig.bags.FirstTupleFromBag;
 import datafu.test.pig.PigTests;
 import junit.framework.Assert;
 
@@ -34,6 +35,7 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.pigunit.PigTest;
 import org.testng.annotations.Test;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +47,8 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 
+import com.beust.jcommander.internal.Lists;
+
 public class BagTests extends PigTests
 {
   /**
@@ -278,6 +282,32 @@ result4 = FOREACH grouped GENERATE group AS a,TupleFromBag(data,0,$emptyTuple).b
 
     assertOutput(test, "data2", "(1,(4))");
   }
+   
+  @Test
+  public void firstTupleFromBagAccumulateTest() throws Exception
+  {
+    TupleFactory tf = TupleFactory.getInstance();
+    BagFactory bf = BagFactory.getInstance();
+ 
+    FirstTupleFromBag op = new FirstTupleFromBag();
+    
+    Tuple defaultValue = tf.newTuple(1000);
+    op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(4))), defaultValue)));
+    op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(9))), defaultValue)));
+    op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(16))), defaultValue)));
+    assertEquals(op.getValue(), tf.newTuple(4));
+    op.cleanup();
+    
+    op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(11))), defaultValue)));
+    op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(17))), defaultValue)));
+    op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(5))), defaultValue)));
+    assertEquals(op.getValue(), tf.newTuple(11));
+    op.cleanup();
+    
+    op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(), defaultValue)));
+    assertEquals(op.getValue(), defaultValue);
+    op.cleanup();
+  }
 
   /**