You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2017/01/06 06:01:12 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2343 The input type of the
accumulation can be a super type of the input tuple type
Repository: apex-malhar
Updated Branches:
refs/heads/master 70caa8909 -> bb3dca1b4
APEXMALHAR-2343 The input type of the accumulation can be a super type of the input tuple type
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bb3dca1b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bb3dca1b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bb3dca1b
Branch: refs/heads/master
Commit: bb3dca1b4f4bc770d422f8683919bbe70cdc41d9
Parents: e476334
Author: David Yan <da...@datatorrent.com>
Authored: Tue Nov 29 17:15:55 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 4 22:59:05 2017 -0800
----------------------------------------------------------------------
.../apache/apex/malhar/lib/window/accumulation/Count.java | 4 ++--
.../apex/malhar/lib/window/accumulation/SumDouble.java | 2 +-
.../apex/malhar/lib/window/accumulation/SumFloat.java | 2 +-
.../apex/malhar/lib/window/accumulation/SumInt.java | 2 +-
.../apex/malhar/lib/window/accumulation/SumLong.java | 2 +-
.../malhar/lib/window/impl/KeyedWindowedOperatorImpl.java | 2 +-
.../apex/malhar/lib/window/impl/WindowedOperatorImpl.java | 2 +-
.../apex/malhar/lib/window/accumulation/SumTest.java | 8 ++++----
.../malhar/stream/api/impl/ApexWindowedStreamImpl.java | 10 +++++-----
9 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
index 7a46e22..dbc9f0f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
*
* @since 3.5.0
*/
-public class Count<T> implements Accumulation<T, MutableLong, Long>
+public class Count implements Accumulation<Object, MutableLong, Long>
{
@Override
@@ -36,7 +36,7 @@ public class Count<T> implements Accumulation<T, MutableLong, Long>
}
@Override
- public MutableLong accumulate(MutableLong accumulatedValue, T input)
+ public MutableLong accumulate(MutableLong accumulatedValue, Object input)
{
accumulatedValue.increment();
return accumulatedValue;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
index 475d653..cfca1f3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
@@ -19,7 +19,7 @@
package org.apache.apex.malhar.lib.window.accumulation;
import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableDouble;
/**
* Sum Accumulation for doubles.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
index dff3be6..dec3308 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
@@ -19,7 +19,7 @@
package org.apache.apex.malhar.lib.window.accumulation;
import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableFloat;
/**
* Sum Accumulation for floats.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
index dca67a4..e4e4d26 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
@@ -19,7 +19,7 @@
package org.apache.apex.malhar.lib.window.accumulation;
import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableInt;
/**
* Sum accumulation for integers.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
index 027e4f8..74df427 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
@@ -19,7 +19,7 @@
package org.apache.apex.malhar.lib.window.accumulation;
import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableLong;
/**
* Sum accumulation for longs.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index b01fe61..deb718b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -48,7 +48,7 @@ import com.datatorrent.lib.util.KeyValPair;
*/
@InterfaceStability.Evolving
public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
- extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>>
+ extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? super InputValT, AccumT, OutputValT>>
{
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
index 26e011a..867d1c1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceStability.Evolving
public class WindowedOperatorImpl<InputT, AccumT, OutputT>
- extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>>
+ extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<? super InputT, AccumT, OutputT>>
{
@Override
public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
index 4587a91..cdc48a1 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
@@ -20,10 +20,10 @@ package org.apache.apex.malhar.lib.window.accumulation;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableFloat;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
/**
* Test for different Sum Accumulations.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
index 5866a4c..9087f35 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -28,9 +28,9 @@ import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
-import org.apache.apex.malhar.lib.window.accumulation.Count;
import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.SumLong;
import org.apache.apex.malhar.lib.window.accumulation.TopN;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
@@ -99,7 +99,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
};
WindowedStream<Tuple<Long>> innerstream = map(kVMap);
- WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new Count());
+ WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new SumLong());
return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
}
@@ -107,7 +107,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts)
{
WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue);
- KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new Count());
+ KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new SumLong());
return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
}
@@ -231,7 +231,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
* @param <OUT>
* @return
*/
- private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<IN, ACCU, OUT> accumulationFn)
+ private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<? super IN, ACCU, OUT> accumulationFn)
{
WindowedOperatorImpl<IN, ACCU, OUT> windowedOperator = new WindowedOperatorImpl<>();
//TODO use other default setting in the future
@@ -251,7 +251,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind
return windowedOperator;
}
- private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<V, ACCU, OUT> accumulationFn)
+ private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<? super V, ACCU, OUT> accumulationFn)
{
KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperator = new KeyedWindowedOperatorImpl<>();
[2/2] apex-malhar git commit: APEXMALHAR-2343 #resolve #comment Count
Accumulation should only increase one for each tuple
Posted by hs...@apache.org.
APEXMALHAR-2343 #resolve #comment Count Accumulation should only increase one for each tuple
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e4763344
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e4763344
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e4763344
Branch: refs/heads/master
Commit: e4763344b1c0f54e2501fbb97c12100360e4a58d
Parents: 70caa89
Author: brightchen <br...@datatorrent.com>
Authored: Wed Nov 16 11:29:46 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 4 22:59:05 2017 -0800
----------------------------------------------------------------------
.../org/apache/apex/malhar/lib/window/accumulation/Count.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e4763344/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
index 62c5678..7a46e22 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
*
* @since 3.5.0
*/
-public class Count implements Accumulation<Long, MutableLong, Long>
+public class Count<T> implements Accumulation<T, MutableLong, Long>
{
@Override
@@ -36,9 +36,9 @@ public class Count implements Accumulation<Long, MutableLong, Long>
}
@Override
- public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+ public MutableLong accumulate(MutableLong accumulatedValue, T input)
{
- accumulatedValue.add(input);
+ accumulatedValue.increment();
return accumulatedValue;
}