You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/03/08 10:09:39 UTC

apex-malhar git commit: APEXMALHAR-2429 Ambiguity in passing key parameter to Join accumulation

Repository: apex-malhar
Updated Branches:
  refs/heads/master 032c6672f -> 6dcd82120


APEXMALHAR-2429 Ambiguity in passing key parameter to Join accumulation


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6dcd8212
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6dcd8212
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6dcd8212

Branch: refs/heads/master
Commit: 6dcd821208ce33cf5a0fd9472e882e17cf524731
Parents: 032c667
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Mon Mar 6 16:10:45 2017 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Wed Mar 8 11:02:43 2017 +0530

----------------------------------------------------------------------
 .../window/accumulation/AbstractPojoJoin.java   | 35 +++++++++++---------
 .../window/accumulation/PojoFullOuterJoin.java  |  5 +--
 .../lib/window/accumulation/PojoInnerJoin.java  | 16 ++++++++-
 .../window/accumulation/PojoLeftOuterJoin.java  |  6 ++--
 .../window/accumulation/PojoRightOuterJoin.java |  6 ++--
 .../window/accumulation/PojoInnerJoinTest.java  | 27 +++++++++++++++
 .../window/accumulation/PojoOuterJoinTest.java  | 14 ++++++--
 7 files changed, 82 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
index 354dd90..1634cb9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
@@ -20,7 +20,6 @@ package org.apache.apex.malhar.lib.window.accumulation;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,30 +42,34 @@ import com.datatorrent.lib.util.PojoUtils;
 public abstract class AbstractPojoJoin<InputT1, InputT2>
     implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>>
 {
-  protected final String[] keys;
-  protected final Class<?> outClass;
+  protected String[] keys;
+  protected Class<?> outClass;
   private transient Map<String,PojoUtils.Getter> gettersStream1;
   private transient Map<String,PojoUtils.Getter> gettersStream2;
   private transient Map<String,PojoUtils.Setter> setters;
   protected transient Set<String> keySetStream2;
   protected transient Set<String> keySetStream1;
+  protected transient String[] leftKeys;
+  protected transient String[] rightKeys;
 
   public AbstractPojoJoin()
   {
-    keys = new String[]{};
+    leftKeys = new String[]{};
+    rightKeys = new String[]{};
     outClass = null;
   }
 
-  public AbstractPojoJoin(Class<?> outClass, String... keys)
+  public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
   {
-    if (keys.length % 2 != 0) {
-      throw new IllegalArgumentException("Wrong number of keys.");
+    if (leftKeys.length != rightKeys.length) {
+      throw new IllegalArgumentException("Number of keys in left stream should match in right stream");
     }
-
-    this.keys = Arrays.copyOf(keys, keys.length);
+    this.leftKeys = leftKeys;
+    this.rightKeys = rightKeys;
     this.outClass = outClass;
   }
 
+
   private void createSetters()
   {
     Field[] fields = outClass.getDeclaredFields();
@@ -174,9 +177,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
       createSetters();
       keySetStream2 = new HashSet<>();
       keySetStream1 = new HashSet<>();
-      for (int i = 0; i < keys.length; i = i + 2) {
-        keySetStream1.add(keys[i]);
-        keySetStream2.add(keys[i + 1]);
+      int lIndex = getLeftStreamIndex();
+      for (int i = 0; i < leftKeys.length; i++) {
+        keySetStream1.add(lIndex == 0 ? leftKeys[i] : rightKeys[i]);
+        keySetStream2.add(lIndex == 1 ? leftKeys[i] : rightKeys[i]);
       }
     }
 
@@ -232,9 +236,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
 
   public Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, Map<String, Object> map2)
   {
-    for (int i = 0; i < keys.length; i = i + 2) {
-      String key1 = keys[i];
-      String key2 = keys[i + 1];
+    int lIndex = getLeftStreamIndex();
+    for (int i = 0; i < leftKeys.length; i++) {
+      String key1 = lIndex == 0 ? leftKeys[i] : rightKeys[i];
+      String key2 = lIndex == 1 ? leftKeys[i] : rightKeys[i];
       if (!map1.get(key1).equals(map2.get(key2))) {
         return null;
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
index 8ad0467..61b37f3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
@@ -39,11 +39,12 @@ public class PojoFullOuterJoin<InputT1, InputT2>
    super();
   }
 
-  public PojoFullOuterJoin(int num, Class<?> outClass, String... keys)
+  public PojoFullOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
   {
-    super(outClass,keys);
+    super(outClass,leftKeys,rightKeys);
   }
 
+
   @Override
   public void addNonMatchingResult(List result, Map requiredMap, Set nullFields)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index ceb17dd..3654e6a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -35,10 +35,24 @@ public class PojoInnerJoin<InputT1, InputT2>
    super();
   }
 
+  @Deprecated
   public PojoInnerJoin(int num, Class<?> outClass, String... keys)
   {
-    super(outClass,keys);
+    this.outClass = outClass;
+    if (keys.length % 2 != 0) {
+      throw new IllegalArgumentException("Wrong number of keys.");
+    }
+    this.leftKeys = new String[keys.length / 2];
+    this.rightKeys = new String[keys.length / 2];
+    for (int i = 0,j = 0; i < keys.length; i = i + 2, j++) {
+      this.leftKeys[j] = keys[i];
+      this.rightKeys[j] = keys[i + 1];
+    }
+  }
 
+  public PojoInnerJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
+  {
+    super(outClass,leftKeys,rightKeys);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
index 0ee3e00..c6e899c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
@@ -37,12 +37,12 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
    super();
   }
 
-  public PojoLeftOuterJoin(int num, Class<?> outClass, String... keys)
+  public PojoLeftOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
   {
-    super(outClass,keys);
-
+    super(outClass,leftKeys,rightKeys);
   }
 
+
   @Override
   public void addNonMatchingResult(List result, Map requiredMap, Set nullFields)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
index 60b0252..b87d4bd 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
@@ -37,12 +37,12 @@ public class PojoRightOuterJoin<InputT1, InputT2>
    super();
   }
 
-  public PojoRightOuterJoin(int num, Class<?> outClass, String... keys)
+  public PojoRightOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
   {
-    super(outClass,keys);
-
+    super(outClass,leftKeys,rightKeys);
   }
 
+
   @Override
   public void addNonMatchingResult(List result, Map requiredMap, Set nullFields)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index fbbb5b1..377a684 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -259,4 +259,31 @@ public class PojoInnerJoinTest
     Assert.assertEquals("Josh", testOutClass.getUName());
     Assert.assertEquals(12, testOutClass.getAge());
   }
+
+  @Test
+  public void PojoInnerJoinTestSeparateLeftAndRightKeys()
+  {
+    String[] leftKeys = {"uId", "uName"};
+    String[] rightKeys = {"uId", "uNickName"};
+    PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys);
+
+    List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+    Assert.assertEquals(2, accu.size());
+
+    accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+    accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+    accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+    accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
+
+    Assert.assertEquals(1, pij.getOutput(accu).size());
+
+    Object o = pij.getOutput(accu).get(0);
+    Assert.assertTrue(o instanceof TestOutMultipleKeysClass);
+    TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o;
+    Assert.assertEquals(1, testOutClass.getUId());
+    Assert.assertEquals("Josh", testOutClass.getUName());
+    Assert.assertEquals(12, testOutClass.getAge());
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
index aaa7de3..e3f5bb7 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
@@ -206,7 +206,9 @@ public class PojoOuterJoinTest
   @Test
   public void PojoLeftOuterJoinTest()
   {
-    PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(2, TestOutClass.class, "uId", "uId");
+    String[] leftKeys = {"uId"};
+    String[] rightKeys = {"uId"};
+    PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
 
     List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
 
@@ -231,7 +233,10 @@ public class PojoOuterJoinTest
   @Test
   public void PojoRightOuterJoinTest()
   {
-    PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(2, TestOutClass.class, "uId", "uId");
+    String[] leftKeys = {"uId"};
+    String[] rightKeys = {"uId"};
+    PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
+
 
     List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
 
@@ -256,7 +261,10 @@ public class PojoOuterJoinTest
   @Test
   public void PojoFullOuterJoinTest()
   {
-    PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(2, TestOutClass.class, "uId", "uId");
+    String[] leftKeys = {"uId"};
+    String[] rightKeys = {"uId"};
+    PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
+
 
     List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();