You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/03/21 11:18:49 UTC
apex-malhar git commit: APEXMALHAR-2400 removing dependency of output
fieldnames to input field names
Repository: apex-malhar
Updated Branches:
refs/heads/master 6a31088c0 -> e29b7c6de
APEXMALHAR-2400 removing dependency of output fieldnames to input field names
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e29b7c6d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e29b7c6d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e29b7c6d
Branch: refs/heads/master
Commit: e29b7c6de8187cb1f2209a495dd0caac97c4dd61
Parents: 6a31088
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Tue Mar 14 15:56:48 2017 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Tue Mar 21 15:30:08 2017 +0530
----------------------------------------------------------------------
.../window/accumulation/AbstractPojoJoin.java | 44 ++++++-
.../window/accumulation/PojoFullOuterJoin.java | 37 +++++-
.../lib/window/accumulation/PojoInnerJoin.java | 6 +
.../window/accumulation/PojoLeftOuterJoin.java | 16 ++-
.../window/accumulation/PojoRightOuterJoin.java | 16 ++-
.../window/accumulation/PojoInnerJoinTest.java | 32 +++++
.../window/accumulation/PojoOuterJoinTest.java | 126 +++++++++++++++++++
7 files changed, 269 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
index a0b3fb3..8fe7df3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
@@ -33,8 +33,11 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.PojoUtils;
+import static org.apache.apex.malhar.lib.window.accumulation.AbstractPojoJoin.STREAM.LEFT;
+
/**
* Join Accumulation for Pojo Streams.
*
@@ -47,9 +50,14 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
protected Class<?> outClass;
private transient Map<String,PojoUtils.Getter> gettersStream1;
private transient Map<String,PojoUtils.Getter> gettersStream2;
- private transient Map<String,PojoUtils.Setter> setters;
+ protected transient Map<String,PojoUtils.Setter> setters;
+ protected transient Map<String, KeyValPair<STREAM, String>> outputToInputMap;
protected transient String[] leftKeys;
protected transient String[] rightKeys;
+ public enum STREAM
+ {
+ LEFT, RIGHT
+ }
public AbstractPojoJoin()
{
@@ -58,6 +66,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
outClass = null;
}
+ /**
+ * This constructor will be used when the user wants to include all the fields of Output POJO
+ * and the field names of output POJO match the field names of POJO coming on input streams.
+ */
public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
{
if (leftKeys.length != rightKeys.length) {
@@ -68,6 +80,16 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
this.outClass = outClass;
}
+ /**
+ * This constructor will be used when the user wants to include some specific
+ * fields of the output POJO and/or wants to have a mapping of the fields of output
+ * POJO to POJO coming on input streams.
+ */
+ public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+ {
+ this(outClass,leftKeys,rightKeys);
+ this.outputToInputMap = outputToInputMap;
+ }
private void createSetters()
{
@@ -220,8 +242,24 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
}
for (Object lObj:left) {
for (Object rObj:right) {
- setObjectForResult(leftGettersStream, lObj,o);
- setObjectForResult(rightGettersStream, rObj,o);
+ if (outputToInputMap != null) {
+ for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+ KeyValPair<STREAM,String> kv = entry.getValue();
+ Object reqObject;
+ Map<String,PojoUtils.Getter> reqStream;
+ if (kv.getKey() == LEFT) {
+ reqObject = leftStreamIndex == 0 ? lObj : rObj;
+ reqStream = leftStreamIndex == 0 ? leftGettersStream : rightGettersStream;
+ } else {
+ reqObject = leftStreamIndex == 0 ? rObj : lObj;
+ reqStream = leftStreamIndex == 0 ? rightGettersStream : leftGettersStream;
+ }
+ setters.get(entry.getKey()).set(o,reqStream.get(entry.getValue().getValue()).get(reqObject));
+ }
+ } else {
+ setObjectForResult(leftGettersStream, lObj, o);
+ setObjectForResult(rightGettersStream, rObj, o);
+ }
}
result.add(o);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
index edee827..c74ded3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Throwables;
import com.google.common.collect.Multimap;
+import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.PojoUtils;
/**
@@ -47,6 +48,11 @@ public class PojoFullOuterJoin<InputT1, InputT2>
super(outClass,leftKeys,rightKeys);
}
+ public PojoFullOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+ {
+ super(outClass,leftKeys,rightKeys, outputToInputMap);
+ }
+
@Override
public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
{
@@ -57,7 +63,15 @@ public class PojoFullOuterJoin<InputT1, InputT2>
} catch (Throwable e) {
throw Throwables.propagate(e);
}
- setObjectForResult(leftGettersStream, lObj, o);
+ if (outputToInputMap != null) {
+ for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+ if (entry.getValue().getKey() == STREAM.LEFT) {
+ setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj));
+ }
+ }
+ } else {
+ setObjectForResult(leftGettersStream, lObj, o);
+ }
result.add(o);
}
}
@@ -67,13 +81,30 @@ public class PojoFullOuterJoin<InputT1, InputT2>
Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result)
{
for (Object key : rightStream.keySet()) {
- addNonMatchingResult(rightStream.get((List)key), rightGettersStream, result);
+ if (outputToInputMap == null) {
+ addNonMatchingResult(rightStream.get((List)key), rightGettersStream, result);
+ } else {
+ for (Object obj: rightStream.get((List)key)) {
+ Object o;
+ try {
+ o = outClass.newInstance();
+ } catch (Throwable e) {
+ throw Throwables.propagate(e);
+ }
+ for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+ if (entry.getValue().getKey() == STREAM.RIGHT) {
+ setters.get(entry.getKey()).set(o, rightGettersStream.get(entry.getValue().getValue()).get(obj));
+ }
+ }
+ result.add(o);
+ }
+ }
}
}
@Override
public int getLeftStreamIndex()
{
- return 1;
+ return 0;
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index a6421fa..1dcc7e5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -24,6 +24,7 @@ import java.util.Map;
import com.google.common.collect.Multimap;
+import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.PojoUtils;
/**
@@ -59,6 +60,11 @@ public class PojoInnerJoin<InputT1, InputT2>
super(outClass,leftKeys,rightKeys);
}
+ public PojoInnerJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+ {
+ super(outClass,leftKeys,rightKeys, outputToInputMap);
+ }
+
@Override
public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
index 4317e30..5405ca5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Throwables;
import com.google.common.collect.Multimap;
+import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.PojoUtils;
/**
@@ -47,6 +48,11 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
super(outClass,leftKeys,rightKeys);
}
+ public PojoLeftOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+ {
+ super(outClass,leftKeys,rightKeys, outputToInputMap);
+ }
+
@Override
public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
{
@@ -57,7 +63,15 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
} catch (Throwable e) {
throw Throwables.propagate(e);
}
- setObjectForResult(leftGettersStream, lObj, o);
+ if (outputToInputMap != null) {
+ for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+ if (entry.getValue().getKey() == STREAM.LEFT) {
+ setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj));
+ }
+ }
+ } else {
+ setObjectForResult(leftGettersStream, lObj, o);
+ }
result.add(o);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
index 2d30346..9d22229 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Throwables;
import com.google.common.collect.Multimap;
+import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.PojoUtils;
/**
@@ -47,6 +48,11 @@ public class PojoRightOuterJoin<InputT1, InputT2>
super(outClass,leftKeys,rightKeys);
}
+ public PojoRightOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+ {
+ super(outClass,leftKeys,rightKeys, outputToInputMap);
+ }
+
@Override
public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
{
@@ -57,7 +63,15 @@ public class PojoRightOuterJoin<InputT1, InputT2>
} catch (Throwable e) {
throw Throwables.propagate(e);
}
- setObjectForResult(leftGettersStream, lObj, o);
+ if (outputToInputMap != null) {
+ for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+ if (entry.getValue().getKey() == STREAM.RIGHT) {
+ setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj));
+ }
+ }
+ } else {
+ setObjectForResult(leftGettersStream, lObj, o);
+ }
result.add(o);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index 483ffdd..0861ca6 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -27,6 +27,8 @@ import org.junit.Test;
import com.google.common.collect.Multimap;
+import com.datatorrent.lib.util.KeyValPair;
+
/**
* Test for {@link PojoInnerJoin}.
*/
@@ -288,4 +290,34 @@ public class PojoInnerJoinTest
Assert.assertEquals("Josh", testOutClass.getUName());
Assert.assertEquals(12, testOutClass.getAge());
}
+
+ @Test
+ public void PojoInnerJoinTestWithMap()
+ {
+ String[] leftKeys = {"uId", "uName"};
+ String[] rightKeys = {"uId", "uNickName"};
+ Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+ outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+ outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+ PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys, outputInputMap);
+
+ List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+
+ accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+ accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+ accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+ accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
+
+ Assert.assertEquals(1, pij.getOutput(accu).size());
+
+ Object o = pij.getOutput(accu).get(0);
+ Assert.assertTrue(o instanceof TestOutMultipleKeysClass);
+ TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o;
+ Assert.assertEquals(1, testOutClass.getUId());
+ Assert.assertEquals(null, testOutClass.getUName());
+ Assert.assertEquals(12, testOutClass.getAge());
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
index fd9d29b..06d1f2d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.apex.malhar.lib.window.accumulation;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.junit.Assert;
@@ -27,6 +29,8 @@ import org.junit.Test;
import com.google.common.collect.Multimap;
+import com.datatorrent.lib.util.KeyValPair;
+
/**
* Test for POJO outer join accumulations
*/
@@ -312,4 +316,126 @@ public class PojoOuterJoinTest
}
Assert.assertEquals(3,checkMap.size());
}
+
+ @Test
+ public void PojoLeftOuterJoinTestWithMap()
+ {
+ String[] leftKeys = {"uId", "uName"};
+ String[] rightKeys = {"uId", "uNickName"};
+ Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+ outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+ outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+ PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
+
+ List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+
+ accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+ accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+ accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+ accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
+
+ List result = pij.getOutput(accu);
+ Assert.assertEquals(2, result.size());
+ Object o = result.get(0);
+ Assert.assertTrue(o instanceof TestOutClass);
+ TestOutClass testOutClass = (TestOutClass)o;
+ int uId = testOutClass.getUId();
+ if (uId == 1) {
+ checkNameAge(null,12,testOutClass);
+ o = result.get(1);
+ Assert.assertTrue(o instanceof TestOutClass);
+ testOutClass = (TestOutClass)o;
+ uId = testOutClass.getUId();
+ Assert.assertEquals(2, uId);
+ checkNameAge(null,0,testOutClass);
+ } else if (uId == 2) {
+ checkNameAge(null,0,testOutClass);
+ o = result.get(1);
+ Assert.assertTrue(o instanceof TestOutClass);
+ testOutClass = (TestOutClass)o;
+ uId = testOutClass.getUId();
+ Assert.assertEquals(1, uId);
+ checkNameAge(null,12,testOutClass);
+ }
+ }
+
+ @Test
+ public void PojoRightOuterJoinTestWithMap()
+ {
+ String[] leftKeys = {"uId", "uName"};
+ String[] rightKeys = {"uId", "uNickName"};
+ Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+ outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+ outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+ PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
+
+ List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+
+ accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+ accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+ accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+ accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13));
+
+ List result = pij.getOutput(accu);
+ Assert.assertEquals(2, result.size());
+ Object o = result.get(0);
+ Assert.assertTrue(o instanceof TestOutClass);
+ TestOutClass testOutClass = (TestOutClass)o;
+ int uId = testOutClass.getUId();
+ if (uId == 1) {
+ checkNameAge(null,12,testOutClass);
+ o = result.get(1);
+ Assert.assertTrue(o instanceof TestOutClass);
+ testOutClass = (TestOutClass)o;
+ uId = testOutClass.getUId();
+ Assert.assertEquals(0, uId);
+ checkNameAge(null,13,testOutClass);
+ } else if (uId == 0) {
+ checkNameAge(null,13,testOutClass);
+ o = result.get(1);
+ Assert.assertTrue(o instanceof TestOutClass);
+ testOutClass = (TestOutClass)o;
+ uId = testOutClass.getUId();
+ Assert.assertEquals(1, uId);
+ checkNameAge(null,12,testOutClass);
+ }
+ }
+
+ @Test
+ public void PojoFullOuterJoinTestWithMap()
+ {
+ String[] leftKeys = {"uId", "uName"};
+ String[] rightKeys = {"uId", "uNickName"};
+ Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+ outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+ outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+ PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
+
+ List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+
+ accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+ accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+ accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+ accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13));
+
+ Assert.assertEquals(3, pij.getOutput(accu).size());
+ Set<Integer> checkMap = new HashSet<>();
+ for ( int i = 0; i < 3; i++ ) {
+ Object o = pij.getOutput(accu).get(i);
+ Assert.assertTrue(o instanceof TestOutClass);
+ TestOutClass testOutClass = (TestOutClass)o;
+ int uId = testOutClass.getUId();
+ checkMap.add(uId);
+ }
+ Assert.assertEquals(3,checkMap.size());
+ }
}