You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by tu...@apache.org on 2017/02/23 11:22:26 UTC

apex-malhar git commit: APEXMALHAR-2414 PojoInner Join accumulation was using java reflection

Repository: apex-malhar
Updated Branches:
  refs/heads/master 8486493a0 -> dd5341f22


APEXMALHAR-2414 PojoInner Join accumulation was using java reflection


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dd5341f2
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dd5341f2
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dd5341f2

Branch: refs/heads/master
Commit: dd5341f222b8141b76ecbc5ea9653c70b3c78c44
Parents: 8486493
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Wed Feb 22 23:55:38 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Thu Feb 23 16:51:01 2017 +0530

----------------------------------------------------------------------
 .../lib/window/accumulation/PojoInnerJoin.java  | 70 +++++++++++++------
 .../window/accumulation/PojoInnerJoinTest.java  | 24 +++----
 .../impl/PojoInnerJoinTestApplication.java      | 72 +++++++++++++++++++-
 3 files changed, 132 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index 1872d19..a5b1117 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -27,9 +27,13 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.commons.lang3.ClassUtils;
 
 import com.google.common.base.Throwables;
 
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
 /**
  * Inner join Accumulation for Pojo Streams.
  *
@@ -40,6 +44,9 @@ public class PojoInnerJoin<InputT1, InputT2>
 {
   protected final String[] keys;
   protected final Class<?> outClass;
+  private transient List<KeyValPair<String,PojoUtils.Getter>>  gettersStream1;
+  private transient List<KeyValPair<String,PojoUtils.Getter>>  gettersStream2;
+  private transient List<KeyValPair<String,PojoUtils.Setter>>  setters;
 
   public PojoInnerJoin()
   {
@@ -57,9 +64,35 @@ public class PojoInnerJoin<InputT1, InputT2>
     this.outClass = outClass;
   }
 
+  private void createSetters()
+  {
+    Field[] fields = outClass.getDeclaredFields();
+    setters = new ArrayList<>();
+    for (Field field : fields) {
+      Class outputField = ClassUtils.primitiveToWrapper(field.getType());
+      String fieldName = field.getName();
+      setters.add(new KeyValPair<>(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField)));
+    }
+  }
+
+  private List<KeyValPair<String,PojoUtils.Getter>> createGetters(Class<?> input)
+  {
+    Field[] fields = input.getDeclaredFields();
+    List<KeyValPair<String,PojoUtils.Getter>> getters = new ArrayList<>();
+    for (Field field : fields) {
+      Class inputField = ClassUtils.primitiveToWrapper(field.getType());
+      String fieldName = field.getName();
+      getters.add(new KeyValPair<>(fieldName,PojoUtils.createGetter(input, fieldName, inputField)));
+    }
+    return getters;
+  }
+
   @Override
   public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input)
   {
+    if (gettersStream1 == null) {
+      gettersStream1 = createGetters(input.getClass());
+    }
     try {
       return accumulateWithIndex(0, accumulatedValue, input);
     } catch (NoSuchFieldException e) {
@@ -70,6 +103,9 @@ public class PojoInnerJoin<InputT1, InputT2>
   @Override
   public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input)
   {
+    if (gettersStream2 == null) {
+      gettersStream2 = createGetters(input.getClass());
+    }
     try {
       return accumulateWithIndex(1, accumulatedValue, input);
     } catch (NoSuchFieldException e) {
@@ -96,27 +132,23 @@ public class PojoInnerJoin<InputT1, InputT2>
     input.getClass().getDeclaredField(keys[index]);
 
     List<Map<String, Object>> curList = accu.get(index);
-    Map map = pojoToMap(input);
+    Map map = pojoToMap(input,index + 1);
     curList.add(map);
     accu.set(index, curList);
 
     return accu;
   }
 
-  private Map<String, Object> pojoToMap(Object input)
+  private Map<String, Object> pojoToMap(Object input, int streamIndex)
   {
     Map<String, Object> map = new HashMap<>();
+    List<KeyValPair<String,PojoUtils.Getter>> gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2;
 
-    Field[] fields = input.getClass().getDeclaredFields();
-
-    for (Field field : fields) {
-      String[] words = field.getName().split("\\.");
-      String fieldName = words[words.length - 1];
-      field.setAccessible(true);
+    for (KeyValPair<String,PojoUtils.Getter> getter : gettersStream) {
       try {
-        Object value = field.get(input);
-        map.put(fieldName, value);
-      } catch (IllegalAccessException e) {
+        Object value = getter.getValue().get(input);
+        map.put(getter.getKey(), value);
+      } catch (Exception e) {
         throw Throwables.propagate(e);
       }
     }
@@ -142,6 +174,10 @@ public class PojoInnerJoin<InputT1, InputT2>
     // TODO: May need to revisit (use state manager).
     result = getAllCombo(0, accumulatedValue, result, null);
 
+    if (setters == null) {
+      createSetters();
+    }
+
     List<Object> out = new ArrayList<>();
     for (Map<String, Object> resultMap : result) {
       Object o;
@@ -150,16 +186,8 @@ public class PojoInnerJoin<InputT1, InputT2>
       } catch (Throwable e) {
         throw Throwables.propagate(e);
       }
-
-      for (Map.Entry<String, Object> entry : resultMap.entrySet()) {
-        Field f;
-        try {
-          f = outClass.getDeclaredField(entry.getKey());
-          f.setAccessible(true);
-          f.set(o, entry.getValue());
-        } catch (NoSuchFieldException | IllegalAccessException e) {
-          throw Throwables.propagate(e);
-        }
+      for (KeyValPair<String, PojoUtils.Setter> setter : setters) {
+        setter.getValue().set(o,resultMap.get(setter.getKey()));
       }
       out.add(o);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
index 47ce815..47c7307 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -47,22 +47,22 @@ public class PojoInnerJoinTest
       this.uName = name;
     }
 
-    public int getuId()
+    public int getUId()
     {
       return uId;
     }
 
-    public void setuId(int uId)
+    public void setUId(int uId)
     {
       this.uId = uId;
     }
 
-    public String getuName()
+    public String getUName()
     {
       return uName;
     }
 
-    public void setuName(String uName)
+    public void setUName(String uName)
     {
       this.uName = uName;
     }
@@ -84,12 +84,12 @@ public class PojoInnerJoinTest
       this.dep = dep;
     }
 
-    public int getuId()
+    public int getUId()
     {
       return uId;
     }
 
-    public void setuId(int uId)
+    public void setUId(int uId)
     {
       this.uId = uId;
     }
@@ -111,22 +111,22 @@ public class PojoInnerJoinTest
     private String uName;
     private String dep;
 
-    public int getuId()
+    public int getUId()
     {
       return uId;
     }
 
-    public void setuId(int uId)
+    public void setUId(int uId)
     {
       this.uId = uId;
     }
 
-    public String getuName()
+    public String getUName()
     {
       return uName;
     }
 
-    public void setuName(String uName)
+    public void setUName(String uName)
     {
       this.uName = uName;
     }
@@ -168,8 +168,8 @@ public class PojoInnerJoinTest
     Object o = pij.getOutput(accu).get(0);
     Assert.assertTrue(o instanceof TestOutClass);
     TestOutClass testOutClass = (TestOutClass)o;
-    Assert.assertEquals(1, testOutClass.getuId());
-    Assert.assertEquals("Josh", testOutClass.getuName());
+    Assert.assertEquals(1, testOutClass.getUId());
+    Assert.assertEquals("Josh", testOutClass.getUName());
     Assert.assertEquals("CS", testOutClass.getDep());
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd5341f2/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
index 809023b..3d2e82a 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
@@ -292,6 +292,76 @@ public class PojoInnerJoinTestApplication implements StreamingApplication
       }
     }
 
+    public static class OutputEvent
+    {
+      public int customerId;
+      public int productId;
+      public int productCategory;
+      public long timestamp;
+      public double amount;
+      public long timestamps;
+
+      public int getCustomerId()
+      {
+        return customerId;
+      }
+
+      public void setCustomerId(int customerId)
+      {
+        this.customerId = customerId;
+      }
+
+      public int getProductId()
+      {
+        return productId;
+      }
+
+      public void setProductId(int productId)
+      {
+        this.productId = productId;
+      }
+
+      public int getProductCategory()
+      {
+        return productCategory;
+      }
+
+      public void setProductCategory(int productCategory)
+      {
+        this.productCategory = productCategory;
+      }
+
+      public long getTimestamp()
+      {
+        return timestamp;
+      }
+
+      public void setTimestamp(long timestamp)
+      {
+        this.timestamp = timestamp;
+      }
+
+      public double getAmount()
+      {
+        return amount;
+      }
+
+      public void setAmount(double amount)
+      {
+        this.amount = amount;
+      }
+
+      public long getTimestamps()
+      {
+        return timestamps;
+      }
+
+      public void setTimestamps(long timestamp)
+      {
+        this.timestamps = timestamp;
+      }
+    }
+
     public int getMaxProductId()
     {
       return maxProductId;
@@ -375,7 +445,7 @@ public class PojoInnerJoinTestApplication implements StreamingApplication
     productGenerator.setSalesEvent(false);
     WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>> op
         = dag.addOperator("Merge", new WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>>());
-    op.setAccumulation(new PojoInnerJoin(2, Object.class, "productId","productId"));
+    op.setAccumulation(new PojoInnerJoin(2, POJOGenerator.OutputEvent.class, "productId","productId"));
     op.setDataStorage(new InMemoryWindowedStorage<List<Set<Object>>>());
 
     WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();