You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by tu...@apache.org on 2017/02/23 11:22:26 UTC
apex-malhar git commit: APEXMALHAR-2414 PojoInner Join accumulation
was using java reflection
Repository: apex-malhar
Updated Branches:
refs/heads/master 8486493a0 -> dd5341f22
APEXMALHAR-2414 PojoInner Join accumulation was using java reflection
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dd5341f2
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dd5341f2
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dd5341f2
Branch: refs/heads/master
Commit: dd5341f222b8141b76ecbc5ea9653c70b3c78c44
Parents: 8486493
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Wed Feb 22 23:55:38 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Thu Feb 23 16:51:01 2017 +0530
----------------------------------------------------------------------
.../lib/window/accumulation/PojoInnerJoin.java | 70 +++++++++++++------
.../window/accumulation/PojoInnerJoinTest.java | 24 +++----
.../impl/PojoInnerJoinTestApplication.java | 72 +++++++++++++++++++-
3 files changed, 132 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index 1872d19..a5b1117 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -27,9 +27,13 @@ import java.util.List;
import java.util.Map;
import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.commons.lang3.ClassUtils;
import com.google.common.base.Throwables;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
/**
* Inner join Accumulation for Pojo Streams.
*
@@ -40,6 +44,9 @@ public class PojoInnerJoin<InputT1, InputT2>
{
protected final String[] keys;
protected final Class<?> outClass;
+ private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream1;
+ private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream2;
+ private transient List<KeyValPair<String,PojoUtils.Setter>> setters;
public PojoInnerJoin()
{
@@ -57,9 +64,35 @@ public class PojoInnerJoin<InputT1, InputT2>
this.outClass = outClass;
}
+ private void createSetters()
+ {
+ Field[] fields = outClass.getDeclaredFields();
+ setters = new ArrayList<>();
+ for (Field field : fields) {
+ Class outputField = ClassUtils.primitiveToWrapper(field.getType());
+ String fieldName = field.getName();
+ setters.add(new KeyValPair<>(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField)));
+ }
+ }
+
+ private List<KeyValPair<String,PojoUtils.Getter>> createGetters(Class<?> input)
+ {
+ Field[] fields = input.getDeclaredFields();
+ List<KeyValPair<String,PojoUtils.Getter>> getters = new ArrayList<>();
+ for (Field field : fields) {
+ Class inputField = ClassUtils.primitiveToWrapper(field.getType());
+ String fieldName = field.getName();
+ getters.add(new KeyValPair<>(fieldName,PojoUtils.createGetter(input, fieldName, inputField)));
+ }
+ return getters;
+ }
+
@Override
public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input)
{
+ if (gettersStream1 == null) {
+ gettersStream1 = createGetters(input.getClass());
+ }
try {
return accumulateWithIndex(0, accumulatedValue, input);
} catch (NoSuchFieldException e) {
@@ -70,6 +103,9 @@ public class PojoInnerJoin<InputT1, InputT2>
@Override
public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input)
{
+ if (gettersStream2 == null) {
+ gettersStream2 = createGetters(input.getClass());
+ }
try {
return accumulateWithIndex(1, accumulatedValue, input);
} catch (NoSuchFieldException e) {
@@ -96,27 +132,23 @@ public class PojoInnerJoin<InputT1, InputT2>
input.getClass().getDeclaredField(keys[index]);
List<Map<String, Object>> curList = accu.get(index);
- Map map = pojoToMap(input);
+ Map map = pojoToMap(input,index + 1);
curList.add(map);
accu.set(index, curList);
return accu;
}
- private Map<String, Object> pojoToMap(Object input)
+ private Map<String, Object> pojoToMap(Object input, int streamIndex)
{
Map<String, Object> map = new HashMap<>();
+ List<KeyValPair<String,PojoUtils.Getter>> gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2;
- Field[] fields = input.getClass().getDeclaredFields();
-
- for (Field field : fields) {
- String[] words = field.getName().split("\\.");
- String fieldName = words[words.length - 1];
- field.setAccessible(true);
+ for (KeyValPair<String,PojoUtils.Getter> getter : gettersStream) {
try {
- Object value = field.get(input);
- map.put(fieldName, value);
- } catch (IllegalAccessException e) {
+ Object value = getter.getValue().get(input);
+ map.put(getter.getKey(), value);
+ } catch (Exception e) {
throw Throwables.propagate(e);
}
}
@@ -142,6 +174,10 @@ public class PojoInnerJoin<InputT1, InputT2>
// TODO: May need to revisit (use state manager).
result = getAllCombo(0, accumulatedValue, result, null);
+ if (setters == null) {
+ createSetters();
+ }
+
List<Object> out = new ArrayList<>();
for (Map<String, Object> resultMap : result) {
Object o;
@@ -150,16 +186,8 @@ public class PojoInnerJoin<InputT1, InputT2>
} catch (Throwable e) {
throw Throwables.propagate(e);
}
-
- for (Map.Entry<String, Object> entry : resultMap.entrySet()) {
- Field f;
- try {
- f = outClass.getDeclaredField(entry.getKey());
- f.setAccessible(true);
- f.set(o, entry.getValue());
- } catch (NoSuchFieldException | IllegalAccessException e) {
- throw Throwables.propagate(e);
- }
+ for (KeyValPair<String, PojoUtils.Setter> setter : setters) {
+ setter.getValue().set(o,resultMap.get(setter.getKey()));
}
out.add(o);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index 47ce815..47c7307 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -47,22 +47,22 @@ public class PojoInnerJoinTest
this.uName = name;
}
- public int getuId()
+ public int getUId()
{
return uId;
}
- public void setuId(int uId)
+ public void setUId(int uId)
{
this.uId = uId;
}
- public String getuName()
+ public String getUName()
{
return uName;
}
- public void setuName(String uName)
+ public void setUName(String uName)
{
this.uName = uName;
}
@@ -84,12 +84,12 @@ public class PojoInnerJoinTest
this.dep = dep;
}
- public int getuId()
+ public int getUId()
{
return uId;
}
- public void setuId(int uId)
+ public void setUId(int uId)
{
this.uId = uId;
}
@@ -111,22 +111,22 @@ public class PojoInnerJoinTest
private String uName;
private String dep;
- public int getuId()
+ public int getUId()
{
return uId;
}
- public void setuId(int uId)
+ public void setUId(int uId)
{
this.uId = uId;
}
- public String getuName()
+ public String getUName()
{
return uName;
}
- public void setuName(String uName)
+ public void setUName(String uName)
{
this.uName = uName;
}
@@ -168,8 +168,8 @@ public class PojoInnerJoinTest
Object o = pij.getOutput(accu).get(0);
Assert.assertTrue(o instanceof TestOutClass);
TestOutClass testOutClass = (TestOutClass)o;
- Assert.assertEquals(1, testOutClass.getuId());
- Assert.assertEquals("Josh", testOutClass.getuName());
+ Assert.assertEquals(1, testOutClass.getUId());
+ Assert.assertEquals("Josh", testOutClass.getUName());
Assert.assertEquals("CS", testOutClass.getDep());
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
index 809023b..3d2e82a 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
@@ -292,6 +292,76 @@ public class PojoInnerJoinTestApplication implements StreamingApplication
}
}
+ public static class OutputEvent
+ {
+ public int customerId;
+ public int productId;
+ public int productCategory;
+ public long timestamp;
+ public double amount;
+ public long timestamps;
+
+ public int getCustomerId()
+ {
+ return customerId;
+ }
+
+ public void setCustomerId(int customerId)
+ {
+ this.customerId = customerId;
+ }
+
+ public int getProductId()
+ {
+ return productId;
+ }
+
+ public void setProductId(int productId)
+ {
+ this.productId = productId;
+ }
+
+ public int getProductCategory()
+ {
+ return productCategory;
+ }
+
+ public void setProductCategory(int productCategory)
+ {
+ this.productCategory = productCategory;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public double getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(double amount)
+ {
+ this.amount = amount;
+ }
+
+ public long getTimestamps()
+ {
+ return timestamps;
+ }
+
+ public void setTimestamps(long timestamp)
+ {
+ this.timestamps = timestamp;
+ }
+ }
+
public int getMaxProductId()
{
return maxProductId;
@@ -375,7 +445,7 @@ public class PojoInnerJoinTestApplication implements StreamingApplication
productGenerator.setSalesEvent(false);
WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>> op
= dag.addOperator("Merge", new WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>>());
- op.setAccumulation(new PojoInnerJoin(2, Object.class, "productId","productId"));
+ op.setAccumulation(new PojoInnerJoin(2, POJOGenerator.OutputEvent.class, "productId","productId"));
op.setDataStorage(new InMemoryWindowedStorage<List<Set<Object>>>());
WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();