You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 23:28:13 UTC
[2/2] apex-malhar git commit: APEXMALHAR-2202 Moved accumulations to
org.apache.apex.malhar.lib.window.accumulation.
APEXMALHAR-2202 Moved accumulations to org.apache.apex.malhar.lib.window.accumulation.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2b2d5bca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2b2d5bca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2b2d5bca
Branch: refs/heads/master
Commit: 2b2d5bca91b23569e04cbc51a541126588eaab44
Parents: dcca775
Author: Shunxin <lu...@hotmail.com>
Authored: Thu Aug 25 13:37:46 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Thu Aug 25 13:37:46 2016 -0700
----------------------------------------------------------------------
.../sample/complete/TopWikipediaSessions.java | 2 +-
.../stream/sample/complete/TrafficRoutes.java | 2 +-
.../sample/cookbook/CombinePerKeyExamples.java | 2 +-
.../stream/sample/cookbook/DeDupExample.java | 2 +-
.../sample/cookbook/MaxPerKeyExamples.java | 2 +-
.../malhar/lib/window/accumulation/Average.java | 64 +++++++++
.../malhar/lib/window/accumulation/Count.java | 61 +++++++++
.../malhar/lib/window/accumulation/FoldFn.java | 65 ++++++++++
.../malhar/lib/window/accumulation/Group.java | 63 +++++++++
.../malhar/lib/window/accumulation/Max.java | 75 +++++++++++
.../malhar/lib/window/accumulation/Min.java | 76 +++++++++++
.../lib/window/accumulation/ReduceFn.java | 65 ++++++++++
.../window/accumulation/RemoveDuplicates.java | 72 +++++++++++
.../lib/window/accumulation/SumDouble.java | 60 +++++++++
.../lib/window/accumulation/SumFloat.java | 60 +++++++++
.../malhar/lib/window/accumulation/SumInt.java | 60 +++++++++
.../malhar/lib/window/accumulation/SumLong.java | 60 +++++++++
.../malhar/lib/window/accumulation/TopN.java | 106 +++++++++++++++
.../lib/window/accumulation/TopNByKey.java | 114 ++++++++++++++++
.../lib/window/impl/accumulation/Average.java | 64 ---------
.../lib/window/impl/accumulation/Count.java | 61 ---------
.../lib/window/impl/accumulation/FoldFn.java | 65 ----------
.../lib/window/impl/accumulation/Group.java | 63 ---------
.../lib/window/impl/accumulation/Max.java | 75 -----------
.../lib/window/impl/accumulation/Min.java | 76 -----------
.../lib/window/impl/accumulation/ReduceFn.java | 65 ----------
.../impl/accumulation/RemoveDuplicates.java | 72 -----------
.../lib/window/impl/accumulation/SumDouble.java | 60 ---------
.../lib/window/impl/accumulation/SumFloat.java | 60 ---------
.../lib/window/impl/accumulation/SumInt.java | 60 ---------
.../lib/window/impl/accumulation/SumLong.java | 60 ---------
.../lib/window/impl/accumulation/TopN.java | 106 ---------------
.../lib/window/impl/accumulation/TopNByKey.java | 114 ----------------
.../lib/window/accumulation/AverageTest.java | 41 ++++++
.../lib/window/accumulation/FoldFnTest.java | 129 +++++++++++++++++++
.../lib/window/accumulation/GroupTest.java | 41 ++++++
.../malhar/lib/window/accumulation/MaxTest.java | 53 ++++++++
.../malhar/lib/window/accumulation/MinTest.java | 53 ++++++++
.../lib/window/accumulation/ReduceFnTest.java | 50 +++++++
.../accumulation/RemoveDuplicatesTest.java | 41 ++++++
.../malhar/lib/window/accumulation/SumTest.java | 57 ++++++++
.../lib/window/accumulation/TopNByKeyTest.java | 74 +++++++++++
.../window/impl/accumulation/AverageTest.java | 41 ------
.../window/impl/accumulation/FoldFnTest.java | 129 -------------------
.../lib/window/impl/accumulation/GroupTest.java | 42 ------
.../lib/window/impl/accumulation/MaxTest.java | 53 --------
.../lib/window/impl/accumulation/MinTest.java | 53 --------
.../window/impl/accumulation/ReduceFnTest.java | 50 -------
.../impl/accumulation/RemoveDuplicatesTest.java | 42 ------
.../lib/window/impl/accumulation/SumTest.java | 57 --------
.../window/impl/accumulation/TopNByKeyTest.java | 75 -----------
.../apex/malhar/stream/api/WindowedStream.java | 4 +-
.../stream/api/impl/ApexWindowedStreamImpl.java | 8 +-
53 files changed, 1551 insertions(+), 1554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index de4e590..f2e70b1 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -29,7 +29,7 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index 2cc04d1..26a2823 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -32,7 +32,7 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Group;
+import org.apache.apex.malhar.lib.window.accumulation.Group;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index ecd71ae..d88a8dc 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
index 53426f3..2930010 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -26,7 +26,7 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.RemoveDuplicates;
+import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 97b2696..02980e4 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -24,7 +24,7 @@ import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Max;
+import org.apache.apex.malhar.lib.window.accumulation.Max;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
new file mode 100644
index 0000000..c669439
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Average Accumulation
+ */
+public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double>
+{
+ @Override
+ public MutablePair<Double, Long> defaultAccumulatedValue()
+ {
+ return new MutablePair<>(0.0, 0L);
+ }
+
+ @Override
+ public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
+ {
+ accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
+ accu.setRight(accu.getRight() + 1);
+ return accu;
+ }
+
+ @Override
+ public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
+ {
+ accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
+ accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
+ accu1.setRight(accu1.getRight() + accu2.getRight());
+ return accu1;
+ }
+
+ @Override
+ public Double getOutput(MutablePair<Double, Long> accumulatedValue)
+ {
+ return accumulatedValue.getLeft();
+ }
+
+ @Override
+ public Double getRetraction(Double value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return 0.0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
new file mode 100644
index 0000000..180152b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.mutable.MutableLong;
+
+/**
+ * Count Accumulation
+ */
+public class Count implements Accumulation<Long, MutableLong, Long>
+{
+
+ @Override
+ public MutableLong defaultAccumulatedValue()
+ {
+ return new MutableLong(0);
+ }
+
+ @Override
+ public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Long getOutput(MutableLong accumulatedValue)
+ {
+ return accumulatedValue.getValue();
+ }
+
+ @Override
+ public Long getRetraction(Long value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
new file mode 100644
index 0000000..dc344b3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Fold Accumulation Adaptor class
+ */
+public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
+{
+
+ public FoldFn()
+ {
+ }
+
+ public FoldFn(OUTPUT initialVal)
+ {
+ this.initialVal = initialVal;
+ }
+
+ private OUTPUT initialVal;
+
+ @Override
+ public OUTPUT defaultAccumulatedValue()
+ {
+ return initialVal;
+ }
+
+ @Override
+ public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
+ {
+ return fold(accumulatedValue, input);
+ }
+
+ @Override
+ public OUTPUT getOutput(OUTPUT accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public OUTPUT getRetraction(OUTPUT value)
+ {
+ return null;
+ }
+
+ abstract OUTPUT fold(OUTPUT result, INPUT input);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
new file mode 100644
index 0000000..c34a8ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Group accumulation.
+ */
+public class Group<T> implements Accumulation<T, List<T>, List<T>>
+{
+ @Override
+ public List<T> defaultAccumulatedValue()
+ {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> accumulate(List<T> accumulatedValue, T input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+ {
+ accumulatedValue1.addAll(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<T> getOutput(List<T> accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return new ArrayList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
new file mode 100644
index 0000000..4164bb2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Max accumulation.
+ */
+public class Max<T> implements Accumulation<T, T, T>
+{
+
+ Comparator<T> comparator;
+
+ public void setComparator(Comparator<T> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public T defaultAccumulatedValue()
+ {
+ return null;
+ }
+
+ @Override
+ public T accumulate(T accumulatedValue, T input)
+ {
+ if (accumulatedValue == null) {
+ return input;
+ } else if (comparator != null) {
+ return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue;
+ } else if (input instanceof Comparable) {
+ return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue;
+ } else {
+ throw new RuntimeException("Tuple cannot be compared");
+ }
+ }
+
+ @Override
+ public T merge(T accumulatedValue1, T accumulatedValue2)
+ {
+ return accumulate(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public T getOutput(T accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public T getRetraction(T value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
new file mode 100644
index 0000000..0460052
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Min accumulation
+ */
+public class Min<T> implements Accumulation<T, T, T>
+{
+
+ Comparator<T> comparator;
+
+ public void setComparator(Comparator<T> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public T defaultAccumulatedValue()
+ {
+ return null;
+ }
+
+ @Override
+ public T accumulate(T accumulatedValue, T input)
+ {
+ if (accumulatedValue == null) {
+ return input;
+ } else if (comparator != null) {
+ return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue;
+ } else if (input instanceof Comparable) {
+ return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue;
+ } else {
+ throw new RuntimeException("Tuple cannot be compared");
+ }
+ }
+
+ @Override
+ public T merge(T accumulatedValue1, T accumulatedValue2)
+ {
+ return accumulate(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public T getOutput(T accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public T getRetraction(T value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
new file mode 100644
index 0000000..0c34c75
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * An easy to use reduce Accumulation
+ * @param <INPUT>
+ */
+public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
+{
+ @Override
+ public INPUT defaultAccumulatedValue()
+ {
+ return null;
+ }
+
+ @Override
+ public INPUT accumulate(INPUT accumulatedValue, INPUT input)
+ {
+ if (accumulatedValue == null) {
+ return input;
+ }
+ return reduce(accumulatedValue, input);
+ }
+
+ @Override
+ public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
+ {
+ return reduce(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public INPUT getOutput(INPUT accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public INPUT getRetraction(INPUT value)
+ {
+ return null;
+ }
+
+ public abstract INPUT reduce(INPUT input1, INPUT input2);
+
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
new file mode 100644
index 0000000..8eee868
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * RemoveDuplicates Accumulation.
+ * @param <T>
+ */
+public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
+{
+ @Override
+ public Set<T> defaultAccumulatedValue()
+ {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<T> accumulate(Set<T> accumulatedValue, T input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
+ {
+ for (T item : accumulatedValue2) {
+ accumulatedValue1.add(item);
+ }
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<T> getOutput(Set<T> accumulatedValue)
+ {
+ if (accumulatedValue == null) {
+ return new ArrayList<>();
+ } else {
+ return new ArrayList<>(accumulatedValue);
+ }
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return new ArrayList<>(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
new file mode 100644
index 0000000..fafff43
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableDouble;
+
+/**
+ * Sum Accumulation for doubles.
+ */
+public class SumDouble implements Accumulation<Double, MutableDouble, Double>
+{
+ @Override
+ public MutableDouble defaultAccumulatedValue()
+ {
+ return new MutableDouble(0.0);
+ }
+
+ @Override
+ public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Double getOutput(MutableDouble accumulatedValue)
+ {
+ return accumulatedValue.doubleValue();
+ }
+
+ @Override
+ public Double getRetraction(Double value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
new file mode 100644
index 0000000..2e2d3ef
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableFloat;
+
+/**
+ * Sum Accumulation for floats.
+ */
+public class SumFloat implements Accumulation<Float, MutableFloat, Float>
+{
+ @Override
+ public MutableFloat defaultAccumulatedValue()
+ {
+ return new MutableFloat(0.);
+ }
+
+ @Override
+ public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Float getOutput(MutableFloat accumulatedValue)
+ {
+ return accumulatedValue.floatValue();
+ }
+
+ @Override
+ public Float getRetraction(Float value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
new file mode 100644
index 0000000..c9aa4bd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableInt;
+
+/**
+ * Sum accumulation for integers.
+ */
+public class SumInt implements Accumulation<Integer, MutableInt, Integer>
+{
+ @Override
+ public MutableInt defaultAccumulatedValue()
+ {
+ return new MutableInt(0);
+ }
+
+ @Override
+ public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Integer getOutput(MutableInt accumulatedValue)
+ {
+ return accumulatedValue.intValue();
+ }
+
+ @Override
+ public Integer getRetraction(Integer value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
new file mode 100644
index 0000000..1268141
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Sum accumulation for longs.
+ */
+public class SumLong implements Accumulation<Long, MutableLong, Long>
+{
+ @Override
+ public MutableLong defaultAccumulatedValue()
+ {
+ return new MutableLong(0L);
+ }
+
+ @Override
+ public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Long getOutput(MutableLong accumulatedValue)
+ {
+ return accumulatedValue.longValue();
+ }
+
+ @Override
+ public Long getRetraction(Long value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
new file mode 100644
index 0000000..c95cbb4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * TopN accumulation
+ */
+public class TopN<T> implements Accumulation<T, List<T>, List<T>>
+{
+ int n;
+
+ Comparator<T> comparator;
+
+ public void setN(int n)
+ {
+ this.n = n;
+ }
+
+ public void setComparator(Comparator<T> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public List<T> defaultAccumulatedValue()
+ {
+ return new LinkedList<>();
+ }
+
+ @Override
+ public List<T> accumulate(List<T> accumulatedValue, T input)
+ {
+ int k = 0;
+ for (T inMemory : accumulatedValue) {
+ if (comparator != null) {
+ if (comparator.compare(inMemory, input) < 0) {
+ break;
+ }
+ } else if (input instanceof Comparable) {
+ if (((Comparable<T>)input).compareTo(inMemory) > 0) {
+ break;
+ }
+ } else {
+ throw new RuntimeException("Tuple cannot be compared");
+ }
+ k++;
+ }
+ accumulatedValue.add(k, input);
+ if (accumulatedValue.size() > n) {
+ accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
+ }
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+ {
+ accumulatedValue1.addAll(accumulatedValue2);
+ if (comparator != null) {
+ Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
+ } else {
+ Collections.sort(accumulatedValue1, Collections.reverseOrder());
+ }
+ if (accumulatedValue1.size() > n) {
+ return accumulatedValue1.subList(0, n);
+ } else {
+ return accumulatedValue1;
+ }
+ }
+
+ @Override
+ public List<T> getOutput(List<T> accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> accumulatedValue)
+ {
+ return new LinkedList<>();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
new file mode 100644
index 0000000..92a6eb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generalized TopNByKey accumulation
+ */
+public class TopNByKey<K, V> implements
+ Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>>
+{
+ int n = 10;
+
+ Comparator<V> comparator;
+
+ public void setN(int n)
+ {
+ this.n = n;
+ }
+
+ public void setComparator(Comparator<V> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public Map<K, V> defaultAccumulatedValue()
+ {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input)
+ {
+ accumulatedValue.put(input.getKey(), input.getValue());
+ return accumulatedValue;
+ }
+
+ @Override
+ public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2)
+ {
+ for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) {
+ if (!accumulatedValue1.containsKey(entry.getKey())) {
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ } else if (comparator != null) {
+ if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) {
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ }
+ } else if (entry.getValue() instanceof Comparable) {
+ if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) {
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue)
+ {
+ LinkedList<KeyValPair<K, V>> result = new LinkedList<>();
+ for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) {
+ int k = 0;
+ for (KeyValPair<K, V> inMemory : result) {
+ if (comparator != null) {
+ if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) {
+ break;
+ }
+ } else if (entry.getValue() instanceof Comparable) {
+ if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) {
+ break;
+ }
+ }
+ k++;
+ }
+ result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
+ if (result.size() > n) {
+ result.remove(result.get(result.size() - 1));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return new LinkedList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
deleted file mode 100644
index 57db6d7..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang3.tuple.MutablePair;
-
-/**
- * Average Accumulation
- */
-public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double>
-{
- @Override
- public MutablePair<Double, Long> defaultAccumulatedValue()
- {
- return new MutablePair<>(0.0, 0L);
- }
-
- @Override
- public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
- {
- accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
- accu.setRight(accu.getRight() + 1);
- return accu;
- }
-
- @Override
- public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
- {
- accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
- accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
- accu1.setRight(accu1.getRight() + accu2.getRight());
- return accu1;
- }
-
- @Override
- public Double getOutput(MutablePair<Double, Long> accumulatedValue)
- {
- return accumulatedValue.getLeft();
- }
-
- @Override
- public Double getRetraction(Double value)
- {
- // TODO: Need to add implementation for retraction.
- return 0.0;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
deleted file mode 100644
index 2c01a0b..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang3.mutable.MutableLong;
-
-/**
- * Count Accumulation
- */
-public class Count implements Accumulation<Long, MutableLong, Long>
-{
-
- @Override
- public MutableLong defaultAccumulatedValue()
- {
- return new MutableLong(0);
- }
-
- @Override
- public MutableLong accumulate(MutableLong accumulatedValue, Long input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Long getOutput(MutableLong accumulatedValue)
- {
- return accumulatedValue.getValue();
- }
-
- @Override
- public Long getRetraction(Long value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
deleted file mode 100644
index 5716cad..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Fold Accumulation Adaptor class
- */
-public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
-{
-
- public FoldFn()
- {
- }
-
- public FoldFn(OUTPUT initialVal)
- {
- this.initialVal = initialVal;
- }
-
- private OUTPUT initialVal;
-
- @Override
- public OUTPUT defaultAccumulatedValue()
- {
- return initialVal;
- }
-
- @Override
- public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
- {
- return fold(accumulatedValue, input);
- }
-
- @Override
- public OUTPUT getOutput(OUTPUT accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public OUTPUT getRetraction(OUTPUT value)
- {
- return null;
- }
-
- abstract OUTPUT fold(OUTPUT result, INPUT input);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
deleted file mode 100644
index 632cad5..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Group accumulation.
- */
-public class Group<T> implements Accumulation<T, List<T>, List<T>>
-{
- @Override
- public List<T> defaultAccumulatedValue()
- {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> accumulate(List<T> accumulatedValue, T input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
- {
- accumulatedValue1.addAll(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public List<T> getOutput(List<T> accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public List<T> getRetraction(List<T> value)
- {
- // TODO: Need to add implementation for retraction.
- return new ArrayList<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
deleted file mode 100644
index 1002b49..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Max accumulation.
- */
-public class Max<T> implements Accumulation<T, T, T>
-{
-
- Comparator<T> comparator;
-
- public void setComparator(Comparator<T> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public T defaultAccumulatedValue()
- {
- return null;
- }
-
- @Override
- public T accumulate(T accumulatedValue, T input)
- {
- if (accumulatedValue == null) {
- return input;
- } else if (comparator != null) {
- return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue;
- } else if (input instanceof Comparable) {
- return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue;
- } else {
- throw new RuntimeException("Tuple cannot be compared");
- }
- }
-
- @Override
- public T merge(T accumulatedValue1, T accumulatedValue2)
- {
- return accumulate(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public T getOutput(T accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public T getRetraction(T value)
- {
- // TODO: Need to add implementation for retraction.
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
deleted file mode 100644
index 66248f4..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Min accumulation
- */
-public class Min<T> implements Accumulation<T, T, T>
-{
-
- Comparator<T> comparator;
-
- public void setComparator(Comparator<T> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public T defaultAccumulatedValue()
- {
- return null;
- }
-
- @Override
- public T accumulate(T accumulatedValue, T input)
- {
- if (accumulatedValue == null) {
- return input;
- } else if (comparator != null) {
- return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue;
- } else if (input instanceof Comparable) {
- return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue;
- } else {
- throw new RuntimeException("Tuple cannot be compared");
- }
- }
-
- @Override
- public T merge(T accumulatedValue1, T accumulatedValue2)
- {
- return accumulate(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public T getOutput(T accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public T getRetraction(T value)
- {
- // TODO: Need to add implementation for retraction.
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
deleted file mode 100644
index c21ab32..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * An easy to use reduce Accumulation
- * @param <INPUT>
- */
-public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
-{
- @Override
- public INPUT defaultAccumulatedValue()
- {
- return null;
- }
-
- @Override
- public INPUT accumulate(INPUT accumulatedValue, INPUT input)
- {
- if (accumulatedValue == null) {
- return input;
- }
- return reduce(accumulatedValue, input);
- }
-
- @Override
- public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
- {
- return reduce(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public INPUT getOutput(INPUT accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public INPUT getRetraction(INPUT value)
- {
- return null;
- }
-
- public abstract INPUT reduce(INPUT input1, INPUT input2);
-
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
deleted file mode 100644
index b7cd770..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * RemoveDuplicates Accumulation.
- * @param <T>
- */
-public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
-{
- @Override
- public Set<T> defaultAccumulatedValue()
- {
- return new HashSet<>();
- }
-
- @Override
- public Set<T> accumulate(Set<T> accumulatedValue, T input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
- {
- for (T item : accumulatedValue2) {
- accumulatedValue1.add(item);
- }
- return accumulatedValue1;
- }
-
- @Override
- public List<T> getOutput(Set<T> accumulatedValue)
- {
- if (accumulatedValue == null) {
- return new ArrayList<>();
- } else {
- return new ArrayList<>(accumulatedValue);
- }
- }
-
- @Override
- public List<T> getRetraction(List<T> value)
- {
- // TODO: Need to add implementation for retraction.
- return new ArrayList<>(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
deleted file mode 100644
index 60b195b..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableDouble;
-
-/**
- * Sum Accumulation for doubles.
- */
-public class SumDouble implements Accumulation<Double, MutableDouble, Double>
-{
- @Override
- public MutableDouble defaultAccumulatedValue()
- {
- return new MutableDouble(0.0);
- }
-
- @Override
- public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Double getOutput(MutableDouble accumulatedValue)
- {
- return accumulatedValue.doubleValue();
- }
-
- @Override
- public Double getRetraction(Double value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
deleted file mode 100644
index 14e69e2..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableFloat;
-
-/**
- * Sum Accumulation for floats.
- */
-public class SumFloat implements Accumulation<Float, MutableFloat, Float>
-{
- @Override
- public MutableFloat defaultAccumulatedValue()
- {
- return new MutableFloat(0.);
- }
-
- @Override
- public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Float getOutput(MutableFloat accumulatedValue)
- {
- return accumulatedValue.floatValue();
- }
-
- @Override
- public Float getRetraction(Float value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
deleted file mode 100644
index 886a7d0..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableInt;
-
-/**
- * Sum accumulation for integers.
- */
-public class SumInt implements Accumulation<Integer, MutableInt, Integer>
-{
- @Override
- public MutableInt defaultAccumulatedValue()
- {
- return new MutableInt(0);
- }
-
- @Override
- public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Integer getOutput(MutableInt accumulatedValue)
- {
- return accumulatedValue.intValue();
- }
-
- @Override
- public Integer getRetraction(Integer value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
deleted file mode 100644
index 469eef9..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableLong;
-
-/**
- * Sum accumulation for longs.
- */
-public class SumLong implements Accumulation<Long, MutableLong, Long>
-{
- @Override
- public MutableLong defaultAccumulatedValue()
- {
- return new MutableLong(0L);
- }
-
- @Override
- public MutableLong accumulate(MutableLong accumulatedValue, Long input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Long getOutput(MutableLong accumulatedValue)
- {
- return accumulatedValue.longValue();
- }
-
- @Override
- public Long getRetraction(Long value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
deleted file mode 100644
index 7dad8cc..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * TopN accumulation
- */
-public class TopN<T> implements Accumulation<T, List<T>, List<T>>
-{
- int n;
-
- Comparator<T> comparator;
-
- public void setN(int n)
- {
- this.n = n;
- }
-
- public void setComparator(Comparator<T> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public List<T> defaultAccumulatedValue()
- {
- return new LinkedList<>();
- }
-
- @Override
- public List<T> accumulate(List<T> accumulatedValue, T input)
- {
- int k = 0;
- for (T inMemory : accumulatedValue) {
- if (comparator != null) {
- if (comparator.compare(inMemory, input) < 0) {
- break;
- }
- } else if (input instanceof Comparable) {
- if (((Comparable<T>)input).compareTo(inMemory) > 0) {
- break;
- }
- } else {
- throw new RuntimeException("Tuple cannot be compared");
- }
- k++;
- }
- accumulatedValue.add(k, input);
- if (accumulatedValue.size() > n) {
- accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
- }
- return accumulatedValue;
- }
-
- @Override
- public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
- {
- accumulatedValue1.addAll(accumulatedValue2);
- if (comparator != null) {
- Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
- } else {
- Collections.sort(accumulatedValue1, Collections.reverseOrder());
- }
- if (accumulatedValue1.size() > n) {
- return accumulatedValue1.subList(0, n);
- } else {
- return accumulatedValue1;
- }
- }
-
- @Override
- public List<T> getOutput(List<T> accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public List<T> getRetraction(List<T> accumulatedValue)
- {
- return new LinkedList<>();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
deleted file mode 100644
index d9f9cfd..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Generalized TopNByKey accumulation
- */
-public class TopNByKey<K, V> implements
- Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>>
-{
- int n = 10;
-
- Comparator<V> comparator;
-
- public void setN(int n)
- {
- this.n = n;
- }
-
- public void setComparator(Comparator<V> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public Map<K, V> defaultAccumulatedValue()
- {
- return new HashMap<>();
- }
-
- @Override
- public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input)
- {
- accumulatedValue.put(input.getKey(), input.getValue());
- return accumulatedValue;
- }
-
- @Override
- public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2)
- {
- for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) {
- if (!accumulatedValue1.containsKey(entry.getKey())) {
- accumulatedValue1.put(entry.getKey(), entry.getValue());
- } else if (comparator != null) {
- if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) {
- accumulatedValue1.put(entry.getKey(), entry.getValue());
- }
- } else if (entry.getValue() instanceof Comparable) {
- if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) {
- accumulatedValue1.put(entry.getKey(), entry.getValue());
- }
- }
- }
- return accumulatedValue1;
- }
-
- @Override
- public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue)
- {
- LinkedList<KeyValPair<K, V>> result = new LinkedList<>();
- for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) {
- int k = 0;
- for (KeyValPair<K, V> inMemory : result) {
- if (comparator != null) {
- if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) {
- break;
- }
- } else if (entry.getValue() instanceof Comparable) {
- if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) {
- break;
- }
- }
- k++;
- }
- result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
- if (result.size() > n) {
- result.remove(result.get(result.size() - 1));
- }
- }
- return result;
- }
-
- @Override
- public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value)
- {
- // TODO: Need to add implementation for retraction.
- return new LinkedList<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
new file mode 100644
index 0000000..e5fd541
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Test for {@link Average}.
+ */
+public class AverageTest
+{
+ @Test
+ public void AverageTest()
+ {
+ Average ave = new Average();
+ MutablePair<Double, Long> accu = ave.defaultAccumulatedValue();
+
+ for (int i = 1; i <= 10; i++) {
+ accu = ave.accumulate(accu, (double)i);
+ }
+ Assert.assertTrue(5.5 == accu.getLeft());
+ }
+}