You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/03/21 11:18:49 UTC

apex-malhar git commit: APEXMALHAR-2400 removing dependency of output fieldnames to input field names

Repository: apex-malhar
Updated Branches:
  refs/heads/master 6a31088c0 -> e29b7c6de


APEXMALHAR-2400 removing dependency of output fieldnames to input field names


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e29b7c6d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e29b7c6d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e29b7c6d

Branch: refs/heads/master
Commit: e29b7c6de8187cb1f2209a495dd0caac97c4dd61
Parents: 6a31088
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Tue Mar 14 15:56:48 2017 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Tue Mar 21 15:30:08 2017 +0530

----------------------------------------------------------------------
 .../window/accumulation/AbstractPojoJoin.java   |  44 ++++++-
 .../window/accumulation/PojoFullOuterJoin.java  |  37 +++++-
 .../lib/window/accumulation/PojoInnerJoin.java  |   6 +
 .../window/accumulation/PojoLeftOuterJoin.java  |  16 ++-
 .../window/accumulation/PojoRightOuterJoin.java |  16 ++-
 .../window/accumulation/PojoInnerJoinTest.java  |  32 +++++
 .../window/accumulation/PojoOuterJoinTest.java  | 126 +++++++++++++++++++
 7 files changed, 269 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
index a0b3fb3..8fe7df3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
@@ -33,8 +33,11 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
+import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.PojoUtils;
 
+import static org.apache.apex.malhar.lib.window.accumulation.AbstractPojoJoin.STREAM.LEFT;
+
 /**
  * Join Accumulation for Pojo Streams.
  *
@@ -47,9 +50,14 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
   protected Class<?> outClass;
   private transient Map<String,PojoUtils.Getter> gettersStream1;
   private transient Map<String,PojoUtils.Getter> gettersStream2;
-  private transient Map<String,PojoUtils.Setter> setters;
+  protected transient Map<String,PojoUtils.Setter> setters;
+  protected transient Map<String, KeyValPair<STREAM, String>> outputToInputMap;
   protected transient String[] leftKeys;
   protected transient String[] rightKeys;
+  public enum STREAM
+  {
+    LEFT, RIGHT
+  }
 
   public AbstractPojoJoin()
   {
@@ -58,6 +66,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
     outClass = null;
   }
 
+  /**
+   * This constructor will be used when the user wants to include all the fields of Output POJO
+   * and the field names of output POJO match the field names of POJO coming on input streams.
+   */
   public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
   {
     if (leftKeys.length != rightKeys.length) {
@@ -68,6 +80,16 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
     this.outClass = outClass;
   }
 
+  /**
+   * This constructor will be used when the user wants to include some specific
+   * fields of the output POJO and/or wants to have a mapping of the fields of output
+   * POJO to POJO coming on input streams.
+   */
+  public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+  {
+    this(outClass,leftKeys,rightKeys);
+    this.outputToInputMap = outputToInputMap;
+  }
 
   private void createSetters()
   {
@@ -220,8 +242,24 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
         }
         for (Object lObj:left) {
           for (Object rObj:right) {
-            setObjectForResult(leftGettersStream, lObj,o);
-            setObjectForResult(rightGettersStream, rObj,o);
+            if (outputToInputMap != null) {
+              for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+                KeyValPair<STREAM,String> kv = entry.getValue();
+                Object reqObject;
+                Map<String,PojoUtils.Getter> reqStream;
+                if (kv.getKey() == LEFT) {
+                  reqObject = leftStreamIndex == 0 ? lObj : rObj;
+                  reqStream = leftStreamIndex == 0 ? leftGettersStream : rightGettersStream;
+                } else {
+                  reqObject = leftStreamIndex == 0 ? rObj : lObj;
+                  reqStream = leftStreamIndex == 0 ? rightGettersStream : leftGettersStream;
+                }
+                setters.get(entry.getKey()).set(o,reqStream.get(entry.getValue().getValue()).get(reqObject));
+              }
+            } else {
+              setObjectForResult(leftGettersStream, lObj, o);
+              setObjectForResult(rightGettersStream, rObj, o);
+            }
           }
           result.add(o);
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
index edee827..c74ded3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Multimap;
 
+import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.PojoUtils;
 
 /**
@@ -47,6 +48,11 @@ public class PojoFullOuterJoin<InputT1, InputT2>
     super(outClass,leftKeys,rightKeys);
   }
 
+  public PojoFullOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+  {
+    super(outClass,leftKeys,rightKeys, outputToInputMap);
+  }
+
   @Override
   public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
   {
@@ -57,7 +63,15 @@ public class PojoFullOuterJoin<InputT1, InputT2>
       } catch (Throwable e) {
         throw Throwables.propagate(e);
       }
-      setObjectForResult(leftGettersStream, lObj, o);
+      if (outputToInputMap != null) {
+        for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+          if (entry.getValue().getKey() == STREAM.LEFT) {
+            setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj));
+          }
+        }
+      } else {
+        setObjectForResult(leftGettersStream, lObj, o);
+      }
       result.add(o);
     }
   }
@@ -67,13 +81,30 @@ public class PojoFullOuterJoin<InputT1, InputT2>
       Map<String,PojoUtils.Getter> rightGettersStream, List<Object> result)
   {
     for (Object key : rightStream.keySet()) {
-      addNonMatchingResult(rightStream.get((List)key), rightGettersStream, result);
+      if (outputToInputMap == null) {
+        addNonMatchingResult(rightStream.get((List)key), rightGettersStream, result);
+      } else {
+        for (Object obj:  rightStream.get((List)key)) {
+          Object o;
+          try {
+            o = outClass.newInstance();
+          } catch (Throwable e) {
+            throw Throwables.propagate(e);
+          }
+          for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+            if (entry.getValue().getKey() == STREAM.RIGHT) {
+              setters.get(entry.getKey()).set(o, rightGettersStream.get(entry.getValue().getValue()).get(obj));
+            }
+          }
+          result.add(o);
+        }
+      }
     }
   }
 
   @Override
   public int getLeftStreamIndex()
   {
-    return 1;
+    return 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index a6421fa..1dcc7e5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import com.google.common.collect.Multimap;
 
+import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.PojoUtils;
 
 /**
@@ -59,6 +60,11 @@ public class PojoInnerJoin<InputT1, InputT2>
     super(outClass,leftKeys,rightKeys);
   }
 
+  public PojoInnerJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+  {
+    super(outClass,leftKeys,rightKeys, outputToInputMap);
+  }
+
   @Override
   public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
index 4317e30..5405ca5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Multimap;
 
+import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.PojoUtils;
 
 /**
@@ -47,6 +48,11 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
     super(outClass,leftKeys,rightKeys);
   }
 
+  public PojoLeftOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+  {
+    super(outClass,leftKeys,rightKeys, outputToInputMap);
+  }
+
   @Override
   public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
   {
@@ -57,7 +63,15 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
       } catch (Throwable e) {
         throw Throwables.propagate(e);
       }
-      setObjectForResult(leftGettersStream, lObj, o);
+      if (outputToInputMap != null) {
+        for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+          if (entry.getValue().getKey() == STREAM.LEFT) {
+            setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj));
+          }
+        }
+      } else {
+        setObjectForResult(leftGettersStream, lObj, o);
+      }
       result.add(o);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
index 2d30346..9d22229 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Multimap;
 
+import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.PojoUtils;
 
 /**
@@ -47,6 +48,11 @@ public class PojoRightOuterJoin<InputT1, InputT2>
     super(outClass,leftKeys,rightKeys);
   }
 
+  public PojoRightOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys, Map<String, KeyValPair<STREAM, String>> outputToInputMap)
+  {
+    super(outClass,leftKeys,rightKeys, outputToInputMap);
+  }
+
   @Override
   public void addNonMatchingResult(Collection<Object> left, Map<String,PojoUtils.Getter> leftGettersStream, List<Object> result)
   {
@@ -57,7 +63,15 @@ public class PojoRightOuterJoin<InputT1, InputT2>
       } catch (Throwable e) {
         throw Throwables.propagate(e);
       }
-      setObjectForResult(leftGettersStream, lObj, o);
+      if (outputToInputMap != null) {
+        for (Map.Entry<String, KeyValPair<STREAM,String>> entry : outputToInputMap.entrySet()) {
+          if (entry.getValue().getKey() == STREAM.RIGHT) {
+            setters.get(entry.getKey()).set(o, leftGettersStream.get(entry.getValue().getValue()).get(lObj));
+          }
+        }
+      } else {
+        setObjectForResult(leftGettersStream, lObj, o);
+      }
       result.add(o);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index 483ffdd..0861ca6 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -27,6 +27,8 @@ import org.junit.Test;
 
 import com.google.common.collect.Multimap;
 
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * Test for {@link PojoInnerJoin}.
  */
@@ -288,4 +290,34 @@ public class PojoInnerJoinTest
     Assert.assertEquals("Josh", testOutClass.getUName());
     Assert.assertEquals(12, testOutClass.getAge());
   }
+
+  @Test
+  public void PojoInnerJoinTestWithMap()
+  {
+    String[] leftKeys = {"uId", "uName"};
+    String[] rightKeys = {"uId", "uNickName"};
+    Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+    outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+    outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+    PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys, outputInputMap);
+
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
+
+    Assert.assertEquals(1, pij.getOutput(accu).size());
+
+    Object o = pij.getOutput(accu).get(0);
+    Assert.assertTrue(o instanceof TestOutMultipleKeysClass);
+    TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o;
+    Assert.assertEquals(1, testOutClass.getUId());
+    Assert.assertEquals(null, testOutClass.getUName());
+    Assert.assertEquals(12, testOutClass.getAge());
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e29b7c6d/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
index fd9d29b..06d1f2d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.apex.malhar.lib.window.accumulation;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.junit.Assert;
@@ -27,6 +29,8 @@ import org.junit.Test;
 
 import com.google.common.collect.Multimap;
 
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * Test for POJO outer join accumulations
  */
@@ -312,4 +316,126 @@ public class PojoOuterJoinTest
     }
     Assert.assertEquals(3,checkMap.size());
   }
+
+  @Test
+  public void PojoLeftOuterJoinTestWithMap()
+  {
+    String[] leftKeys = {"uId", "uName"};
+    String[] rightKeys = {"uId", "uNickName"};
+    Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+    outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+    outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+    PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
+
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
+
+    List result = pij.getOutput(accu);
+    Assert.assertEquals(2, result.size());
+    Object o = result.get(0);
+    Assert.assertTrue(o instanceof TestOutClass);
+    TestOutClass testOutClass = (TestOutClass)o;
+    int uId = testOutClass.getUId();
+    if (uId == 1) {
+      checkNameAge(null,12,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(2, uId);
+      checkNameAge(null,0,testOutClass);
+    } else if (uId == 2) {
+      checkNameAge(null,0,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(1, uId);
+      checkNameAge(null,12,testOutClass);
+    }
+  }
+
+  @Test
+  public void PojoRightOuterJoinTestWithMap()
+  {
+    String[] leftKeys = {"uId", "uName"};
+    String[] rightKeys = {"uId", "uNickName"};
+    Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+    outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+    outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+    PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
+
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13));
+
+    List result = pij.getOutput(accu);
+    Assert.assertEquals(2, result.size());
+    Object o = result.get(0);
+    Assert.assertTrue(o instanceof TestOutClass);
+    TestOutClass testOutClass = (TestOutClass)o;
+    int uId = testOutClass.getUId();
+    if (uId == 1) {
+      checkNameAge(null,12,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(0, uId);
+      checkNameAge(null,13,testOutClass);
+    } else if (uId == 0) {
+      checkNameAge(null,13,testOutClass);
+      o = result.get(1);
+      Assert.assertTrue(o instanceof TestOutClass);
+      testOutClass = (TestOutClass)o;
+      uId = testOutClass.getUId();
+      Assert.assertEquals(1, uId);
+      checkNameAge(null,12,testOutClass);
+    }
+  }
+
+  @Test
+  public void PojoFullOuterJoinTestWithMap()
+  {
+    String[] leftKeys = {"uId", "uName"};
+    String[] rightKeys = {"uId", "uNickName"};
+    Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
+    outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
+    outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
+    PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
+
+    List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13));
+
+    Assert.assertEquals(3, pij.getOutput(accu).size());
+    Set<Integer> checkMap = new HashSet<>();
+    for ( int i = 0; i < 3; i++ ) {
+      Object o = pij.getOutput(accu).get(i);
+      Assert.assertTrue(o instanceof TestOutClass);
+      TestOutClass testOutClass = (TestOutClass)o;
+      int uId = testOutClass.getUId();
+      checkMap.add(uId);
+    }
+    Assert.assertEquals(3,checkMap.size());
+  }
 }