You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/12/19 23:22:54 UTC

[GitHub] nwangtw closed pull request #3124: Nwang/fix streamwindowoperator generic types

nwangtw closed pull request #3124: Nwang/fix streamwindowoperator generic types
URL: https://github.com/apache/incubator-heron/pull/3124
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 6b09de2a94..7ca0f4343f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -454,15 +454,15 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
    * @param reduceFn The reduce function that you want to apply to all the values of a key.
    */
   @Override
-  public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(
-      SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> valueExtractor,
-      WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn) {
+  public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(
+      SerializableFunction<R, K> keyExtractor, SerializableFunction<R, T> valueExtractor,
+      WindowConfig windowCfg, SerializableBinaryOperator<T> reduceFn) {
     checkNotNull(keyExtractor, "keyExtractor cannot be null");
     checkNotNull(valueExtractor, "valueExtractor cannot be null");
     checkNotNull(windowCfg, "windowCfg cannot be null");
     checkNotNull(reduceFn, "reduceFn cannot be null");
 
-    ReduceByKeyAndWindowStreamlet<K, V, R> retval =
+    ReduceByKeyAndWindowStreamlet<R, K, T> retval =
         new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
             windowCfg, reduceFn);
     addChild(retval);
@@ -491,7 +491,7 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
     checkNotNull(identity, "identity cannot be null");
     checkNotNull(reduceFn, "reduceFn cannot be null");
 
-    GeneralReduceByKeyAndWindowStreamlet<K, R, T> retval =
+    GeneralReduceByKeyAndWindowStreamlet<R, K, T> retval =
         new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg,
             identity, reduceFn);
     addChild(retval);
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/JoinCustomGrouping.java b/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/JoinCustomGrouping.java
index f72094112f..d91697d0d6 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/JoinCustomGrouping.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/JoinCustomGrouping.java
@@ -33,12 +33,12 @@
  * JoinOperator. It essentially ensures that the values being routed are of type
  * KeyValue uses the key to route the tuple to the destination.
  */
-public class JoinCustomGrouping<K, V> implements CustomStreamGrouping {
+public class JoinCustomGrouping<R, K> implements CustomStreamGrouping {
   private static final long serialVersionUID = 2007892247960031525L;
-  private SerializableFunction<V, K> keyExtractor;
+  private SerializableFunction<R, K> keyExtractor;
   private List<Integer> taskIds;
 
-  public JoinCustomGrouping(SerializableFunction<V, K> keyExtractor) {
+  public JoinCustomGrouping(SerializableFunction<R, K> keyExtractor) {
     this.keyExtractor = keyExtractor;
   }
 
@@ -52,7 +52,7 @@ public void prepare(TopologyContext context, String component,
   @Override
   public List<Integer> chooseTasks(List<Object> values) {
     List<Integer> ret = new ArrayList<>();
-    V obj = (V) values.get(0);
+    R obj = (R) values.get(0);
     int key = keyExtractor.apply(obj).hashCode();
     ret.add(Utils.assignKeyToTask(key, taskIds));
     return ret;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/ReduceByKeyAndWindowCustomGrouping.java b/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/ReduceByKeyAndWindowCustomGrouping.java
index 7c228d3618..560b5942f9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/ReduceByKeyAndWindowCustomGrouping.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/groupings/ReduceByKeyAndWindowCustomGrouping.java
@@ -36,12 +36,12 @@
  * The current implementation is identical to JoinCustomGrouping but it might
  * evolve in the future.
  */
-public class ReduceByKeyAndWindowCustomGrouping<K, V> implements CustomStreamGrouping {
+public class ReduceByKeyAndWindowCustomGrouping<R, K> implements CustomStreamGrouping {
   private static final long serialVersionUID = -7630948017550637716L;
-  private SerializableFunction<V, K> keyExtractor;
+  private SerializableFunction<R, K> keyExtractor;
   private List<Integer> taskIds;
 
-  public ReduceByKeyAndWindowCustomGrouping(SerializableFunction<V, K> keyExtractor) {
+  public ReduceByKeyAndWindowCustomGrouping(SerializableFunction<R, K> keyExtractor) {
     this.keyExtractor = keyExtractor;
   }
 
@@ -55,7 +55,7 @@ public void prepare(TopologyContext context, String component,
   @Override
   public List<Integer> chooseTasks(List<Object> values) {
     List<Integer> ret = new ArrayList<>();
-    V obj = (V) values.get(0);
+    R obj = (R) values.get(0);
     int key = keyExtractor.apply(obj).hashCode();
     ret.add(Utils.assignKeyToTask(key, taskIds));
     return ret;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index fbdc4db44f..b3a2cc2fdf 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -36,16 +36,18 @@
  * It takes in a reduceFunction Function as an input.
  * For every time window, the bolt goes over all the tuples in that window and applies the reduce
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
+ * R: Incoming data type, K: Key type, T: Result data type
  */
-public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWindowOperator<V, V> {
-  private static final long serialVersionUID = 2833576046687752396L;
-  private SerializableFunction<V, K> keyExtractor;
-  private VR identity;
-  private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
+public class GeneralReduceByKeyAndWindowOperator<R, K, T>
+    extends StreamletWindowOperator<R, KeyValue<KeyedWindow<K>, T>> {
 
-  public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor,
-                                             VR identity,
-                                             SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
+  private SerializableFunction<R, K> keyExtractor;
+  private T identity;
+  private SerializableBiFunction<T, R, ? extends T> reduceFn;
+
+  public GeneralReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
+                                             T identity,
+                                             SerializableBiFunction<T, R, ? extends T> reduceFn) {
     this.keyExtractor = keyExtractor;
     this.identity = identity;
     this.reduceFn = reduceFn;
@@ -54,10 +56,10 @@ public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtract
   @SuppressWarnings("unchecked")
   @Override
   public void execute(TupleWindow inputWindow) {
-    Map<K, VR> reduceMap = new HashMap<>();
+    Map<K, T> reduceMap = new HashMap<>();
     Map<K, Integer> windowCountMap = new HashMap<>();
     for (Tuple tuple : inputWindow.get()) {
-      V tup = (V) tuple.getValue(0);
+      R tup = (R) tuple.getValue(0);
       addMap(reduceMap, windowCountMap, tup);
     }
     long startWindow;
@@ -79,7 +81,7 @@ public void execute(TupleWindow inputWindow) {
     }
   }
 
-  private void addMap(Map<K, VR> reduceMap, Map<K, Integer> windowCountMap, V tup) {
+  private void addMap(Map<K, T> reduceMap, Map<K, Integer> windowCountMap, R tup) {
     K key = keyExtractor.apply(tup);
     if (!reduceMap.containsKey(key)) {
       reduceMap.put(key, identity);
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
index 92c2d20fb6..6649848868 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
@@ -36,17 +36,18 @@
  * It takes in a reduceFunction Function as an input.
  * For every time window, the bolt goes over all the tuples in that window and applies the reduce
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as outputs
+ * R: Incoming data type, K: Key type, T: Result data type
  */
-public class ReduceByKeyAndWindowOperator<K, V, R> extends StreamletWindowOperator<R, V> {
+public class ReduceByKeyAndWindowOperator<R, K, T>
+    extends StreamletWindowOperator<R, KeyValue<KeyedWindow<K>, T>> {
 
-  private static final long serialVersionUID = 2833576046687750496L;
   private SerializableFunction<R, K> keyExtractor;
-  private SerializableFunction<R, V> valueExtractor;
-  private SerializableBinaryOperator<V> reduceFn;
+  private SerializableFunction<R, T> valueExtractor;
+  private SerializableBinaryOperator<T> reduceFn;
 
   public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
-                                      SerializableFunction<R, V> valueExtractor,
-                                      SerializableBinaryOperator<V> reduceFn) {
+                                      SerializableFunction<R, T> valueExtractor,
+                                      SerializableBinaryOperator<T> reduceFn) {
     this.keyExtractor = keyExtractor;
     this.valueExtractor = valueExtractor;
     this.reduceFn = reduceFn;
@@ -55,7 +56,7 @@ public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
   @SuppressWarnings("unchecked")
   @Override
   public void execute(TupleWindow inputWindow) {
-    Map<K, V> reduceMap = new HashMap<>();
+    Map<K, T> reduceMap = new HashMap<>();
     Map<K, Integer> windowCountMap = new HashMap<>();
     for (Tuple tuple : inputWindow.get()) {
       R tup = (R) tuple.getValue(0);
@@ -80,7 +81,7 @@ public void execute(TupleWindow inputWindow) {
     }
   }
 
-  private void addMap(Map<K, V> reduceMap, Map<K, Integer> windowCountMap, R tup) {
+  private void addMap(Map<K, T> reduceMap, Map<K, Integer> windowCountMap, R tup) {
     K key = keyExtractor.apply(tup);
     if (reduceMap.containsKey(key)) {
       reduceMap.put(key, reduceFn.apply(reduceMap.get(key), valueExtractor.apply(tup)));
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index b66956f41d..f95d7d24fe 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -37,21 +37,21 @@
  * applying user supplied reduceFn on all elements within each window defined by a
  * user supplied Window Config.
  * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
- * KeyWindowInfo&lt;K&gt; type and the value is of type V.
+ * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
-public class GeneralReduceByKeyAndWindowStreamlet<K, V, VR>
-    extends StreamletImpl<KeyValue<KeyedWindow<K>, VR>> {
-  private StreamletImpl<V> parent;
-  private SerializableFunction<V, K> keyExtractor;
+public class GeneralReduceByKeyAndWindowStreamlet<R, K, T>
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
+  private StreamletImpl<R> parent;
+  private SerializableFunction<R, K> keyExtractor;
   private WindowConfig windowCfg;
-  private VR identity;
-  private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
+  private T identity;
+  private SerializableBiFunction<T, R, ? extends T> reduceFn;
 
-  public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<V> parent,
-                            SerializableFunction<V, K> keyExtractor,
+  public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
+                            SerializableFunction<R, K> keyExtractor,
                             WindowConfig windowCfg,
-                            VR identity,
-                            SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
+                            T identity,
+                            SerializableBiFunction<T, R, ? extends T> reduceFn) {
     this.parent = parent;
     this.keyExtractor = keyExtractor;
     this.windowCfg = windowCfg;
@@ -63,12 +63,12 @@ public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<V> parent,
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
-    GeneralReduceByKeyAndWindowOperator<K, V, VR> bolt =
-        new GeneralReduceByKeyAndWindowOperator<K, V, VR>(keyExtractor, identity, reduceFn);
+    GeneralReduceByKeyAndWindowOperator<R, K, T> bolt =
+        new GeneralReduceByKeyAndWindowOperator<R, K, T>(keyExtractor, identity, reduceFn);
     windowCfg.applyTo(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(parent.getName(), parent.getStreamId(),
-            new ReduceByKeyAndWindowCustomGrouping<K, V>(keyExtractor));
+            new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index 3f3effb300..40f76ef259 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -92,9 +92,9 @@ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     windowCfg.applyTo(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(left.getName(), left.getStreamId(),
-            new JoinCustomGrouping<K, R>(leftKeyExtractor))
+            new JoinCustomGrouping<R, K>(leftKeyExtractor))
         .customGrouping(right.getName(), right.getStreamId(),
-            new JoinCustomGrouping<K, S>(rightKeyExtractor));
+            new JoinCustomGrouping<S, K>(rightKeyExtractor));
     return true;
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 24052a8ef0..18fea4d714 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -37,21 +37,21 @@
  * applying user supplied reduceFn on all elements within each window defined by a
  * user supplied Window Config.
  * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
- * KeyWindowInfo&lt;K&gt; type and the value is of type V.
+ * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
-public class ReduceByKeyAndWindowStreamlet<K, V, R>
-    extends StreamletImpl<KeyValue<KeyedWindow<K>, V>> {
+public class ReduceByKeyAndWindowStreamlet<R, K, T>
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
-  private SerializableFunction<R, V> valueExtractor;
+  private SerializableFunction<R, T> valueExtractor;
   private WindowConfig windowCfg;
-  private SerializableBinaryOperator<V> reduceFn;
+  private SerializableBinaryOperator<T> reduceFn;
 
   public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
                        SerializableFunction<R, K> keyExtractor,
-                       SerializableFunction<R, V> valueExtractor,
+                       SerializableFunction<R, T> valueExtractor,
                        WindowConfig windowCfg,
-                       SerializableBinaryOperator<V> reduceFn) {
+                       SerializableBinaryOperator<T> reduceFn) {
     this.parent = parent;
     this.keyExtractor = keyExtractor;
     this.valueExtractor = valueExtractor;
@@ -63,12 +63,12 @@ public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
   @Override
   public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
-    ReduceByKeyAndWindowOperator<K, V, R> bolt = new ReduceByKeyAndWindowOperator<>(keyExtractor,
+    ReduceByKeyAndWindowOperator<R, K, T> bolt = new ReduceByKeyAndWindowOperator<>(keyExtractor,
         valueExtractor, reduceFn);
     windowCfg.applyTo(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(parent.getName(), parent.getStreamId(),
-            new ReduceByKeyAndWindowCustomGrouping<K, R>(keyExtractor));
+            new ReduceByKeyAndWindowCustomGrouping<R, K>(keyExtractor));
     return true;
   }
 }
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperatorTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperatorTest.java
index 6154d3fa39..10a945c2d4 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperatorTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperatorTest.java
@@ -59,7 +59,7 @@ public void setUp() {
   @Test
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testReduceByWindowOperator() {
-    GeneralReduceByKeyAndWindowOperator<String, KeyValue<String, Integer>, Integer> reduceOperator =
+    GeneralReduceByKeyAndWindowOperator<KeyValue<String, Integer>, String, Integer> reduceOperator =
         getReduceByWindowOperator(12);
 
     TupleWindow tupleWindow = getTupleWindow(3, 5);
@@ -106,9 +106,9 @@ private TupleWindow getTupleWindow(int nkeys, int count) {
 
 
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private GeneralReduceByKeyAndWindowOperator<String, KeyValue<String, Integer>, Integer>
+  private GeneralReduceByKeyAndWindowOperator<KeyValue<String, Integer>, String, Integer>
         getReduceByWindowOperator(Integer identity) {
-    GeneralReduceByKeyAndWindowOperator<String, KeyValue<String, Integer>, Integer>
+    GeneralReduceByKeyAndWindowOperator<KeyValue<String, Integer>, String, Integer>
         reduceByWindowOperator = new GeneralReduceByKeyAndWindowOperator<>(
             x -> x.getKey(), identity, (o, o2) -> o + o2.getValue());
 
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperatorTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperatorTest.java
index 8083c01624..0e57bc7ad2 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperatorTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperatorTest.java
@@ -59,7 +59,7 @@ public void setUp() {
   @Test
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testReduceByWindowOperator() {
-    ReduceByKeyAndWindowOperator<String, Integer, String> reduceOperator =
+    ReduceByKeyAndWindowOperator<String, String, Integer> reduceOperator =
         getReduceByWindowOperator();
 
     TupleWindow tupleWindow = getTupleWindow(3, 5);
@@ -106,8 +106,8 @@ private TupleWindow getTupleWindow(int nkeys, int count) {
 
 
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private ReduceByKeyAndWindowOperator<String, Integer, String> getReduceByWindowOperator() {
-    ReduceByKeyAndWindowOperator<String, Integer, String> reduceByWindowOperator =
+  private ReduceByKeyAndWindowOperator<String, String, Integer> getReduceByWindowOperator() {
+    ReduceByKeyAndWindowOperator<String, String, Integer> reduceByWindowOperator =
         new ReduceByKeyAndWindowOperator<>(x -> x, x -> 1, (o, o2) -> o + o2);
 
     reduceByWindowOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services