You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/03/08 10:09:39 UTC
apex-malhar git commit: APEXMALHAR-2429 Ambiguity in passing key
parameter to Join accumulation
Repository: apex-malhar
Updated Branches:
refs/heads/master 032c6672f -> 6dcd82120
APEXMALHAR-2429 Ambiguity in passing key parameter to Join accumulation
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6dcd8212
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6dcd8212
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6dcd8212
Branch: refs/heads/master
Commit: 6dcd821208ce33cf5a0fd9472e882e17cf524731
Parents: 032c667
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Mon Mar 6 16:10:45 2017 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Wed Mar 8 11:02:43 2017 +0530
----------------------------------------------------------------------
.../window/accumulation/AbstractPojoJoin.java | 35 +++++++++++---------
.../window/accumulation/PojoFullOuterJoin.java | 5 +--
.../lib/window/accumulation/PojoInnerJoin.java | 16 ++++++++-
.../window/accumulation/PojoLeftOuterJoin.java | 6 ++--
.../window/accumulation/PojoRightOuterJoin.java | 6 ++--
.../window/accumulation/PojoInnerJoinTest.java | 27 +++++++++++++++
.../window/accumulation/PojoOuterJoinTest.java | 14 ++++++--
7 files changed, 82 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
index 354dd90..1634cb9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java
@@ -20,7 +20,6 @@ package org.apache.apex.malhar.lib.window.accumulation;
import java.lang.reflect.Field;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,30 +42,34 @@ import com.datatorrent.lib.util.PojoUtils;
public abstract class AbstractPojoJoin<InputT1, InputT2>
implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>>
{
- protected final String[] keys;
- protected final Class<?> outClass;
+ protected String[] keys;
+ protected Class<?> outClass;
private transient Map<String,PojoUtils.Getter> gettersStream1;
private transient Map<String,PojoUtils.Getter> gettersStream2;
private transient Map<String,PojoUtils.Setter> setters;
protected transient Set<String> keySetStream2;
protected transient Set<String> keySetStream1;
+ protected transient String[] leftKeys;
+ protected transient String[] rightKeys;
public AbstractPojoJoin()
{
- keys = new String[]{};
+ leftKeys = new String[]{};
+ rightKeys = new String[]{};
outClass = null;
}
- public AbstractPojoJoin(Class<?> outClass, String... keys)
+ public AbstractPojoJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
{
- if (keys.length % 2 != 0) {
- throw new IllegalArgumentException("Wrong number of keys.");
+ if (leftKeys.length != rightKeys.length) {
+ throw new IllegalArgumentException("Number of keys in left stream should match in right stream");
}
-
- this.keys = Arrays.copyOf(keys, keys.length);
+ this.leftKeys = leftKeys;
+ this.rightKeys = rightKeys;
this.outClass = outClass;
}
+
private void createSetters()
{
Field[] fields = outClass.getDeclaredFields();
@@ -174,9 +177,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
createSetters();
keySetStream2 = new HashSet<>();
keySetStream1 = new HashSet<>();
- for (int i = 0; i < keys.length; i = i + 2) {
- keySetStream1.add(keys[i]);
- keySetStream2.add(keys[i + 1]);
+ int lIndex = getLeftStreamIndex();
+ for (int i = 0; i < leftKeys.length; i++) {
+ keySetStream1.add(lIndex == 0 ? leftKeys[i] : rightKeys[i]);
+ keySetStream2.add(lIndex == 1 ? leftKeys[i] : rightKeys[i]);
}
}
@@ -232,9 +236,10 @@ public abstract class AbstractPojoJoin<InputT1, InputT2>
public Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, Map<String, Object> map2)
{
- for (int i = 0; i < keys.length; i = i + 2) {
- String key1 = keys[i];
- String key2 = keys[i + 1];
+ int lIndex = getLeftStreamIndex();
+ for (int i = 0; i < leftKeys.length; i++) {
+ String key1 = lIndex == 0 ? leftKeys[i] : rightKeys[i];
+ String key2 = lIndex == 1 ? leftKeys[i] : rightKeys[i];
if (!map1.get(key1).equals(map2.get(key2))) {
return null;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
index 8ad0467..61b37f3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java
@@ -39,11 +39,12 @@ public class PojoFullOuterJoin<InputT1, InputT2>
super();
}
- public PojoFullOuterJoin(int num, Class<?> outClass, String... keys)
+ public PojoFullOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
{
- super(outClass,keys);
+ super(outClass,leftKeys,rightKeys);
}
+
@Override
public void addNonMatchingResult(List result, Map requiredMap, Set nullFields)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index ceb17dd..3654e6a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -35,10 +35,24 @@ public class PojoInnerJoin<InputT1, InputT2>
super();
}
+ @Deprecated
public PojoInnerJoin(int num, Class<?> outClass, String... keys)
{
- super(outClass,keys);
+ this.outClass = outClass;
+ if (keys.length % 2 != 0) {
+ throw new IllegalArgumentException("Wrong number of keys.");
+ }
+ this.leftKeys = new String[keys.length / 2];
+ this.rightKeys = new String[keys.length / 2];
+ for (int i = 0,j = 0; i < keys.length; i = i + 2, j++) {
+ this.leftKeys[j] = keys[i];
+ this.rightKeys[j] = keys[i + 1];
+ }
+ }
+ public PojoInnerJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
+ {
+ super(outClass,leftKeys,rightKeys);
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
index 0ee3e00..c6e899c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java
@@ -37,12 +37,12 @@ public class PojoLeftOuterJoin<InputT1, InputT2>
super();
}
- public PojoLeftOuterJoin(int num, Class<?> outClass, String... keys)
+ public PojoLeftOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
{
- super(outClass,keys);
-
+ super(outClass,leftKeys,rightKeys);
}
+
@Override
public void addNonMatchingResult(List result, Map requiredMap, Set nullFields)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
index 60b0252..b87d4bd 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java
@@ -37,12 +37,12 @@ public class PojoRightOuterJoin<InputT1, InputT2>
super();
}
- public PojoRightOuterJoin(int num, Class<?> outClass, String... keys)
+ public PojoRightOuterJoin(Class<?> outClass, String[] leftKeys, String[] rightKeys)
{
- super(outClass,keys);
-
+ super(outClass,leftKeys,rightKeys);
}
+
@Override
public void addNonMatchingResult(List result, Map requiredMap, Set nullFields)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index fbbb5b1..377a684 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -259,4 +259,31 @@ public class PojoInnerJoinTest
Assert.assertEquals("Josh", testOutClass.getUName());
Assert.assertEquals(12, testOutClass.getAge());
}
+
+ @Test
+ public void PojoInnerJoinTestSeparateLeftAndRightKeys()
+ {
+ String[] leftKeys = {"uId", "uName"};
+ String[] rightKeys = {"uId", "uNickName"};
+ PojoInnerJoin<TestPojo1, TestPojo3> pij = new PojoInnerJoin<>(TestOutMultipleKeysClass.class, leftKeys, rightKeys);
+
+ List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+
+ accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+ accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+ accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
+ accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
+
+ Assert.assertEquals(1, pij.getOutput(accu).size());
+
+ Object o = pij.getOutput(accu).get(0);
+ Assert.assertTrue(o instanceof TestOutMultipleKeysClass);
+ TestOutMultipleKeysClass testOutClass = (TestOutMultipleKeysClass)o;
+ Assert.assertEquals(1, testOutClass.getUId());
+ Assert.assertEquals("Josh", testOutClass.getUName());
+ Assert.assertEquals(12, testOutClass.getAge());
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6dcd8212/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
index aaa7de3..e3f5bb7 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java
@@ -206,7 +206,9 @@ public class PojoOuterJoinTest
@Test
public void PojoLeftOuterJoinTest()
{
- PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(2, TestOutClass.class, "uId", "uId");
+ String[] leftKeys = {"uId"};
+ String[] rightKeys = {"uId"};
+ PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
@@ -231,7 +233,10 @@ public class PojoOuterJoinTest
@Test
public void PojoRightOuterJoinTest()
{
- PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(2, TestOutClass.class, "uId", "uId");
+ String[] leftKeys = {"uId"};
+ String[] rightKeys = {"uId"};
+ PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
+
List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
@@ -256,7 +261,10 @@ public class PojoOuterJoinTest
@Test
public void PojoFullOuterJoinTest()
{
- PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(2, TestOutClass.class, "uId", "uId");
+ String[] leftKeys = {"uId"};
+ String[] rightKeys = {"uId"};
+ PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
+
List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();