You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2016/02/16 20:32:57 UTC
[1/2] incubator-datafu git commit: DATAFU-114: Make FirstTupleFromBag
implement Accumulator
Repository: incubator-datafu
Updated Branches:
refs/heads/master 92a56e49f -> a15a15c58
DATAFU-114: Make FirstTupleFromBag implement Accumulator
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/f7c9b232
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/f7c9b232
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/f7c9b232
Branch: refs/heads/master
Commit: f7c9b232fe1f7a9a2c445f7c25bb73a3c6a099c1
Parents: 92a56e4
Author: Eyal Allweil <ey...@yahoo.com>
Authored: Thu Feb 4 16:03:51 2016 -0800
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Thu Feb 4 16:04:05 2016 -0800
----------------------------------------------------------------------
.../java/datafu/pig/bags/FirstTupleFromBag.java | 37 +++++++++++++++++---
1 file changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/f7c9b232/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
index 1f24984..01b0d61 100644
--- a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
+++ b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
@@ -21,11 +21,13 @@ package datafu.pig.bags;
import java.io.IOException;
-import datafu.pig.util.SimpleEvalFunc;
+import org.apache.pig.Accumulator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import datafu.pig.util.SimpleEvalFunc;
+
/**
* Returns the first tuple from a bag. Requires a second parameter that will be returned if the bag is empty.
*
@@ -46,8 +48,36 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
* </pre>
*/
-public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
+public class FirstTupleFromBag extends SimpleEvalFunc<Tuple> implements Accumulator<Tuple>
{
+ private Tuple result = null;
+ private boolean found = false;
+
+ @Override
+ public void accumulate(Tuple tuple) throws IOException
+ {
+ if (found == false) {
+ DataBag bag = (DataBag) tuple.get(0);
+ Tuple defaultValue = (Tuple) tuple.get(1);
+
+ result = call(bag, defaultValue);
+ found = true;
+ }
+ }
+
+ @Override
+ public void cleanup()
+ {
+ found = false;
+ result = null;
+ }
+
+ @Override
+ public Tuple getValue()
+ {
+ return result;
+ }
+
public Tuple call(DataBag bag, Tuple defaultValue) throws IOException
{
for (Tuple t : bag) {
@@ -66,5 +96,4 @@ public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
return null;
}
}
-}
-
+}
\ No newline at end of file
[2/2] incubator-datafu git commit: DATAFU-114: Add accumulator test
for FirstTupleFromBag
Posted by mh...@apache.org.
DATAFU-114: Add accumulator test for FirstTupleFromBag
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/a15a15c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/a15a15c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/a15a15c5
Branch: refs/heads/master
Commit: a15a15c58164cc39c62235c0da6a7b1d8d7db9e2
Parents: f7c9b23
Author: Matthew Hayes <ma...@gmail.com>
Authored: Thu Feb 4 16:05:02 2016 -0800
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Thu Feb 4 16:05:02 2016 -0800
----------------------------------------------------------------------
.../java/datafu/test/pig/bags/BagTests.java | 30 ++++++++++++++++++++
1 file changed, 30 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a15a15c5/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
index a5851e4..9fdb250 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
@@ -22,6 +22,7 @@ package datafu.test.pig.bags;
import datafu.pig.bags.CountEach;
import datafu.pig.bags.DistinctBy;
import datafu.pig.bags.Enumerate;
+import datafu.pig.bags.FirstTupleFromBag;
import datafu.test.pig.PigTests;
import junit.framework.Assert;
@@ -34,6 +35,7 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.pigunit.PigTest;
import org.testng.annotations.Test;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +47,8 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import com.beust.jcommander.internal.Lists;
+
public class BagTests extends PigTests
{
/**
@@ -278,6 +282,32 @@ result4 = FOREACH grouped GENERATE group AS a,TupleFromBag(data,0,$emptyTuple).b
assertOutput(test, "data2", "(1,(4))");
}
+
+ @Test
+ public void firstTupleFromBagAccumulateTest() throws Exception
+ {
+ TupleFactory tf = TupleFactory.getInstance();
+ BagFactory bf = BagFactory.getInstance();
+
+ FirstTupleFromBag op = new FirstTupleFromBag();
+
+ Tuple defaultValue = tf.newTuple(1000);
+ op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(4))), defaultValue)));
+ op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(9))), defaultValue)));
+ op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(16))), defaultValue)));
+ assertEquals(op.getValue(), tf.newTuple(4));
+ op.cleanup();
+
+ op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(11))), defaultValue)));
+ op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(17))), defaultValue)));
+ op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(Arrays.asList(tf.newTuple(5))), defaultValue)));
+ assertEquals(op.getValue(), tf.newTuple(11));
+ op.cleanup();
+
+ op.accumulate(tf.newTuple(Arrays.asList(bf.newDefaultBag(), defaultValue)));
+ assertEquals(op.getValue(), defaultValue);
+ op.cleanup();
+ }
/**