You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2017/01/06 06:01:12 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2343 The input type of the accumulation can be a super type of the input tuple type

Repository: apex-malhar
Updated Branches:
  refs/heads/master 70caa8909 -> bb3dca1b4


APEXMALHAR-2343 The input type of the accumulation can be a super type of the input tuple type


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bb3dca1b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bb3dca1b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bb3dca1b

Branch: refs/heads/master
Commit: bb3dca1b4f4bc770d422f8683919bbe70cdc41d9
Parents: e476334
Author: David Yan <da...@datatorrent.com>
Authored: Tue Nov 29 17:15:55 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 4 22:59:05 2017 -0800

----------------------------------------------------------------------
 .../apache/apex/malhar/lib/window/accumulation/Count.java |  4 ++--
 .../apex/malhar/lib/window/accumulation/SumDouble.java    |  2 +-
 .../apex/malhar/lib/window/accumulation/SumFloat.java     |  2 +-
 .../apex/malhar/lib/window/accumulation/SumInt.java       |  2 +-
 .../apex/malhar/lib/window/accumulation/SumLong.java      |  2 +-
 .../malhar/lib/window/impl/KeyedWindowedOperatorImpl.java |  2 +-
 .../apex/malhar/lib/window/impl/WindowedOperatorImpl.java |  2 +-
 .../apex/malhar/lib/window/accumulation/SumTest.java      |  8 ++++----
 .../malhar/stream/api/impl/ApexWindowedStreamImpl.java    | 10 +++++-----
 9 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
index 7a46e22..dbc9f0f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
  *
  * @since 3.5.0
  */
-public class Count<T> implements Accumulation<T, MutableLong, Long>
+public class Count implements Accumulation<Object, MutableLong, Long>
 {
 
   @Override
@@ -36,7 +36,7 @@ public class Count<T> implements Accumulation<T, MutableLong, Long>
   }
 
   @Override
-  public MutableLong accumulate(MutableLong accumulatedValue, T input)
+  public MutableLong accumulate(MutableLong accumulatedValue, Object input)
   {
     accumulatedValue.increment();
     return accumulatedValue;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
index 475d653..cfca1f3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableDouble;
 
 /**
  * Sum Accumulation for doubles.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
index dff3be6..dec3308 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableFloat;
 
 /**
  * Sum Accumulation for floats.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
index dca67a4..e4e4d26 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableInt;
 
 /**
  * Sum accumulation for integers.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
index 027e4f8..74df427 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableLong;
 
 /**
  * Sum accumulation for longs.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index b01fe61..deb718b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -48,7 +48,7 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 @InterfaceStability.Evolving
 public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
-    extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>>
+    extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? super InputValT, AccumT, OutputValT>>
 {
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
index 26e011a..867d1c1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
  */
 @InterfaceStability.Evolving
 public class WindowedOperatorImpl<InputT, AccumT, OutputT>
-    extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>>
+    extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<? super InputT, AccumT, OutputT>>
 {
   @Override
   public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
index 4587a91..cdc48a1 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
@@ -20,10 +20,10 @@ package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableFloat;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
 
 /**
  * Test for different Sum Accumulations.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
index 5866a4c..9087f35 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -28,9 +28,9 @@ import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.lib.window.WindowState;
 
-import org.apache.apex.malhar.lib.window.accumulation.Count;
 import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
 import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.SumLong;
 import org.apache.apex.malhar.lib.window.accumulation.TopN;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
@@ -99,7 +99,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
     };
 
     WindowedStream<Tuple<Long>> innerstream = map(kVMap);
-    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new Count());
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new SumLong());
     return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
   }
 
@@ -107,7 +107,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
   public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts)
   {
     WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue);
-    KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new Count());
+    KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new SumLong());
     return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
   }
 
@@ -231,7 +231,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
    * @param <OUT>
    * @return
    */
-  private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<IN, ACCU, OUT> accumulationFn)
+  private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<? super IN, ACCU, OUT> accumulationFn)
   {
     WindowedOperatorImpl<IN, ACCU, OUT> windowedOperator = new WindowedOperatorImpl<>();
     //TODO use other default setting in the future
@@ -251,7 +251,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
     return windowedOperator;
   }
 
-  private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<V, ACCU, OUT> accumulationFn)
+  private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<? super V, ACCU, OUT> accumulationFn)
   {
     KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperator = new KeyedWindowedOperatorImpl<>();
 


[2/2] apex-malhar git commit: APEXMALHAR-2343 #resolve #comment Count Accumulation should only increase one for each tuple

Posted by hs...@apache.org.
APEXMALHAR-2343 #resolve #comment Count Accumulation should only increase one for each tuple


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e4763344
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e4763344
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e4763344

Branch: refs/heads/master
Commit: e4763344b1c0f54e2501fbb97c12100360e4a58d
Parents: 70caa89
Author: brightchen <br...@datatorrent.com>
Authored: Wed Nov 16 11:29:46 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 4 22:59:05 2017 -0800

----------------------------------------------------------------------
 .../org/apache/apex/malhar/lib/window/accumulation/Count.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e4763344/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
index 62c5678..7a46e22 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
  *
  * @since 3.5.0
  */
-public class Count implements Accumulation<Long, MutableLong, Long>
+public class Count<T> implements Accumulation<T, MutableLong, Long>
 {
 
   @Override
@@ -36,9 +36,9 @@ public class Count implements Accumulation<Long, MutableLong, Long>
   }
 
   @Override
-  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+  public MutableLong accumulate(MutableLong accumulatedValue, T input)
   {
-    accumulatedValue.add(input);
+    accumulatedValue.increment();
     return accumulatedValue;
   }