You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 23:28:12 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2202 Moved accumulations to
org.apache.apex.malhar.lib.window.accumulation.
Repository: apex-malhar
Updated Branches:
refs/heads/master dcca7752a -> 2b2d5bca9
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java
new file mode 100644
index 0000000..cda6bf8
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.apex.malhar.lib.window.Tuple;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Test for {@link ReduceFn}.
+ */
+public class FoldFnTest
+{
+ public static class NumGen extends BaseOperator implements InputOperator
+ {
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
+
+ public static int count = 0;
+ private int i = 0;
+
+ public NumGen()
+ {
+ count = 0;
+ i = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (i <= 7) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ // Ignore it.
+ }
+ count++;
+ if (i >= 0) {
+ output.emit(i++);
+ }
+ }
+ i = -1;
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static int result;
+
+ public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = new DefaultInputPort<Tuple.WindowedTuple<Integer>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<Integer> tuple)
+ {
+ result = tuple.getValue();
+ }
+ };
+
+ public int getResult()
+ {
+ return result;
+ }
+ }
+
+ public static class Plus extends FoldFn<Integer, Integer>
+ {
+ @Override
+ public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2)
+ {
+ return fold(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public Integer fold(Integer input1, Integer input2)
+ {
+ if (input1 == null) {
+ return input2;
+ }
+ return input1 + input2;
+ }
+ }
+
+ @Test
+ public void FoldFnTest()
+ {
+
+ FoldFn<String, String> concat = new FoldFn<String, String>()
+ {
+ @Override
+ public String merge(String accumulatedValue1, String accumulatedValue2)
+ {
+ return fold(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public String fold(String input1, String input2)
+ {
+ return input1 + ", " + input2;
+ }
+ };
+
+ String[] ss = new String[]{"b", "c", "d", "e"};
+ String base = "a";
+
+ for (String s : ss) {
+ base = concat.accumulate(base, s);
+ }
+ Assert.assertEquals("a, b, c, d, e", base);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java
new file mode 100644
index 0000000..891a824
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link Group}.
+ */
+public class GroupTest
+{
+ @Test
+ public void GroupTest()
+ {
+ Group<Integer> group = new Group<>();
+
+ List<Integer> accu = group.defaultAccumulatedValue();
+ Assert.assertEquals(0, accu.size());
+ Assert.assertEquals(1, group.accumulate(accu, 10).size());
+ Assert.assertEquals(2, group.accumulate(accu, 11).size());
+ Assert.assertEquals(3, group.accumulate(accu, 11).size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java
new file mode 100644
index 0000000..fe87d9e
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Max accumulation
+ */
+public class MaxTest
+{
+ @Test
+ public void MaxTest()
+ {
+ Max<Integer> max = new Max<>();
+
+ Assert.assertEquals((Integer)5, max.accumulate(5, 3));
+ Assert.assertEquals((Integer)6, max.accumulate(4, 6));
+ Assert.assertEquals((Integer)5, max.merge(5, 2));
+
+ Comparator<Integer> com = new Comparator<Integer>()
+ {
+ @Override
+ public int compare(Integer o1, Integer o2)
+ {
+ return -(o1.compareTo(o2));
+ }
+ };
+
+ max.setComparator(com);
+ Assert.assertEquals((Integer)3, max.accumulate(5, 3));
+ Assert.assertEquals((Integer)4, max.accumulate(4, 6));
+ Assert.assertEquals((Integer)2, max.merge(5, 2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java
new file mode 100644
index 0000000..3589735
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link Min}.
+ */
+public class MinTest
+{
+ @Test
+ public void MinTest()
+ {
+ Min<Integer> min = new Min<>();
+
+ Assert.assertEquals((Integer)3, min.accumulate(5, 3));
+ Assert.assertEquals((Integer)4, min.accumulate(4, 6));
+ Assert.assertEquals((Integer)2, min.merge(5, 2));
+
+ Comparator<Integer> com = new Comparator<Integer>()
+ {
+ @Override
+ public int compare(Integer o1, Integer o2)
+ {
+ return -(o1.compareTo(o2));
+ }
+ };
+
+ min.setComparator(com);
+ Assert.assertEquals((Integer)5, min.accumulate(5, 3));
+ Assert.assertEquals((Integer)6, min.accumulate(4, 6));
+ Assert.assertEquals((Integer)5, min.merge(5, 2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java
new file mode 100644
index 0000000..26d73a7
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link ReduceFn}.
+ */
+public class ReduceFnTest
+{
+
+ @Test
+ public void ReduceFnTest()
+ {
+ ReduceFn<String> concat = new ReduceFn<String>()
+ {
+ @Override
+ public String reduce(String input1, String input2)
+ {
+ return input1 + ", " + input2;
+ }
+ };
+
+ String[] ss = new String[]{"b", "c", "d", "e"};
+ String base = "a";
+
+ for (String s : ss) {
+ base = concat.accumulate(base, s);
+ }
+ Assert.assertEquals("a, b, c, d, e", base);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java
new file mode 100644
index 0000000..674f871
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Set;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RemoveDuplicates}.
+ */
+public class RemoveDuplicatesTest
+{
+ @Test
+ public void RemoveDuplicatesTest()
+ {
+ RemoveDuplicates<Integer> rd = new RemoveDuplicates<>();
+
+ Set<Integer> accu = rd.defaultAccumulatedValue();
+ Assert.assertEquals(0, accu.size());
+ Assert.assertEquals(1, rd.accumulate(accu, 10).size());
+ Assert.assertEquals(2, rd.accumulate(accu, 11).size());
+ Assert.assertEquals(2, rd.accumulate(accu, 11).size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
new file mode 100644
index 0000000..4c55612
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Test for different Sum Accumulations.
+ */
+public class SumTest
+{
+ @Test
+ public void SumTest()
+ {
+ SumInt si = new SumInt();
+ SumLong sl = new SumLong();
+ SumFloat sf = new SumFloat();
+ SumDouble sd = new SumDouble();
+
+ Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10));
+ Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10));
+ Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21)));
+
+ Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L));
+ Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L));
+ Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L)));
+
+ Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F));
+ Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F));
+ Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F)));
+
+ Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0));
+ Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0));
+ Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java
new file mode 100644
index 0000000..5bf2207
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Unit test for TopNByKey accumulation
+ */
+public class TopNByKeyTest
+{
+ @Test
+ public void TopNByKeyTest() throws Exception
+ {
+ TopNByKey<String, Integer> topNByKey = new TopNByKey<>();
+ topNByKey.setN(3);
+ Map<String, Integer> accu = topNByKey.defaultAccumulatedValue();
+
+ Assert.assertEquals(0, accu.size());
+
+ accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1));
+ accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3));
+
+ List<KeyValPair<String, Integer>> result1 = new ArrayList<>();
+
+ result1.add(new KeyValPair<String, Integer>("3", 3));
+ result1.add(new KeyValPair<String, Integer>("1", 1));
+
+ Assert.assertEquals(result1, topNByKey.getOutput(accu));
+
+ accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2));
+
+ List<KeyValPair<String, Integer>> result2 = new ArrayList<>();
+
+ result2.add(new KeyValPair<String, Integer>("3", 3));
+ result2.add(new KeyValPair<String, Integer>("2", 2));
+ result2.add(new KeyValPair<String, Integer>("1", 1));
+
+ Assert.assertEquals(result2, topNByKey.getOutput(accu));
+
+ accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5));
+ accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4));
+
+ List<KeyValPair<String, Integer>> result3 = new ArrayList<>();
+
+ result3.add(new KeyValPair<String, Integer>("5", 5));
+ result3.add(new KeyValPair<String, Integer>("4", 4));
+ result3.add(new KeyValPair<String, Integer>("3", 3));
+
+ Assert.assertEquals(result3, topNByKey.getOutput(accu));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
deleted file mode 100644
index fb4de3c..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.commons.lang3.tuple.MutablePair;
-
-/**
- * Test for {@link Average}.
- */
-public class AverageTest
-{
- @Test
- public void AverageTest()
- {
- Average ave = new Average();
- MutablePair<Double, Long> accu = ave.defaultAccumulatedValue();
-
- for (int i = 1; i <= 10; i++) {
- accu = ave.accumulate(accu, (double)i);
- }
- Assert.assertTrue(5.5 == accu.getLeft());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
deleted file mode 100644
index 4e6f8f1..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.apex.malhar.lib.window.Tuple;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Test for {@link ReduceFn}.
- */
-public class FoldFnTest
-{
- public static class NumGen extends BaseOperator implements InputOperator
- {
- public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
-
- public static int count = 0;
- private int i = 0;
-
- public NumGen()
- {
- count = 0;
- i = 0;
- }
-
- @Override
- public void emitTuples()
- {
- while (i <= 7) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- // Ignore it.
- }
- count++;
- if (i >= 0) {
- output.emit(i++);
- }
- }
- i = -1;
- }
- }
-
- public static class Collector extends BaseOperator
- {
- private static int result;
-
- public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = new DefaultInputPort<Tuple.WindowedTuple<Integer>>()
- {
- @Override
- public void process(Tuple.WindowedTuple<Integer> tuple)
- {
- result = tuple.getValue();
- }
- };
-
- public int getResult()
- {
- return result;
- }
- }
-
- public static class Plus extends FoldFn<Integer, Integer>
- {
- @Override
- public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2)
- {
- return fold(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public Integer fold(Integer input1, Integer input2)
- {
- if (input1 == null) {
- return input2;
- }
- return input1 + input2;
- }
- }
-
- @Test
- public void FoldFnTest()
- {
-
- FoldFn<String, String> concat = new FoldFn<String, String>()
- {
- @Override
- public String merge(String accumulatedValue1, String accumulatedValue2)
- {
- return fold(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public String fold(String input1, String input2)
- {
- return input1 + ", " + input2;
- }
- };
-
- String[] ss = new String[]{"b", "c", "d", "e"};
- String base = "a";
-
- for (String s : ss) {
- base = concat.accumulate(base, s);
- }
- Assert.assertEquals("a, b, c, d, e", base);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
deleted file mode 100644
index a9aac77..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link Group}.
- */
-public class GroupTest
-{
- @Test
- public void GroupTest()
- {
- Group<Integer> group = new Group<>();
-
- List<Integer> accu = group.defaultAccumulatedValue();
- Assert.assertEquals(0, accu.size());
- Assert.assertEquals(1, group.accumulate(accu, 10).size());
- Assert.assertEquals(2, group.accumulate(accu, 11).size());
- Assert.assertEquals(3, group.accumulate(accu, 11).size());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
deleted file mode 100644
index c873125..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for Max accumulation
- */
-public class MaxTest
-{
- @Test
- public void MaxTest()
- {
- Max<Integer> max = new Max<>();
-
- Assert.assertEquals((Integer)5, max.accumulate(5, 3));
- Assert.assertEquals((Integer)6, max.accumulate(4, 6));
- Assert.assertEquals((Integer)5, max.merge(5, 2));
-
- Comparator<Integer> com = new Comparator<Integer>()
- {
- @Override
- public int compare(Integer o1, Integer o2)
- {
- return -(o1.compareTo(o2));
- }
- };
-
- max.setComparator(com);
- Assert.assertEquals((Integer)3, max.accumulate(5, 3));
- Assert.assertEquals((Integer)4, max.accumulate(4, 6));
- Assert.assertEquals((Integer)2, max.merge(5, 2));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
deleted file mode 100644
index 74816b0..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link Min}.
- */
-public class MinTest
-{
- @Test
- public void MinTest()
- {
- Min<Integer> min = new Min<>();
-
- Assert.assertEquals((Integer)3, min.accumulate(5, 3));
- Assert.assertEquals((Integer)4, min.accumulate(4, 6));
- Assert.assertEquals((Integer)2, min.merge(5, 2));
-
- Comparator<Integer> com = new Comparator<Integer>()
- {
- @Override
- public int compare(Integer o1, Integer o2)
- {
- return -(o1.compareTo(o2));
- }
- };
-
- min.setComparator(com);
- Assert.assertEquals((Integer)5, min.accumulate(5, 3));
- Assert.assertEquals((Integer)6, min.accumulate(4, 6));
- Assert.assertEquals((Integer)5, min.merge(5, 2));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
deleted file mode 100644
index 6b5bbad..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link ReduceFn}.
- */
-public class ReduceFnTest
-{
-
- @Test
- public void ReduceFnTest()
- {
- ReduceFn<String> concat = new ReduceFn<String>()
- {
- @Override
- public String reduce(String input1, String input2)
- {
- return input1 + ", " + input2;
- }
- };
-
- String[] ss = new String[]{"b", "c", "d", "e"};
- String base = "a";
-
- for (String s : ss) {
- base = concat.accumulate(base, s);
- }
- Assert.assertEquals("a, b, c, d, e", base);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
deleted file mode 100644
index f0196d2..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link RemoveDuplicates}.
- */
-public class RemoveDuplicatesTest
-{
- @Test
- public void RemoveDuplicatesTest()
- {
- RemoveDuplicates<Integer> rd = new RemoveDuplicates<>();
-
- Set<Integer> accu = rd.defaultAccumulatedValue();
- Assert.assertEquals(0, accu.size());
- Assert.assertEquals(1, rd.accumulate(accu, 10).size());
- Assert.assertEquals(2, rd.accumulate(accu, 11).size());
- Assert.assertEquals(2, rd.accumulate(accu, 11).size());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
deleted file mode 100644
index 65b6480..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableFloat;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.lang.mutable.MutableLong;
-
-/**
- * Test for different Sum Accumulations.
- */
-public class SumTest
-{
- @Test
- public void SumTest()
- {
- SumInt si = new SumInt();
- SumLong sl = new SumLong();
- SumFloat sf = new SumFloat();
- SumDouble sd = new SumDouble();
-
- Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10));
- Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10));
- Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21)));
-
- Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L));
- Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L));
- Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L)));
-
- Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F));
- Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F));
- Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F)));
-
- Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0));
- Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0));
- Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9)));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
deleted file mode 100644
index 3f6ac09..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Unit test for TopNByKey accumulation
- */
-public class TopNByKeyTest
-{
- @Test
- public void TopNByKeyTest() throws Exception
- {
- TopNByKey<String, Integer> topNByKey = new TopNByKey<>();
- topNByKey.setN(3);
- Map<String, Integer> accu = topNByKey.defaultAccumulatedValue();
-
- Assert.assertEquals(0, accu.size());
-
- accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1));
- accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3));
-
- List<KeyValPair<String, Integer>> result1 = new ArrayList<>();
-
- result1.add(new KeyValPair<String, Integer>("3", 3));
- result1.add(new KeyValPair<String, Integer>("1", 1));
-
- Assert.assertEquals(result1, topNByKey.getOutput(accu));
-
- accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2));
-
- List<KeyValPair<String, Integer>> result2 = new ArrayList<>();
-
- result2.add(new KeyValPair<String, Integer>("3", 3));
- result2.add(new KeyValPair<String, Integer>("2", 2));
- result2.add(new KeyValPair<String, Integer>("1", 1));
-
- Assert.assertEquals(result2, topNByKey.getOutput(accu));
-
- accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5));
- accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4));
-
- List<KeyValPair<String, Integer>> result3 = new ArrayList<>();
-
- result3.add(new KeyValPair<String, Integer>("5", 5));
- result3.add(new KeyValPair<String, Integer>("4", 4));
- result3.add(new KeyValPair<String, Integer>("3", 3));
-
- Assert.assertEquals(result3, topNByKey.getOutput(accu));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
index 84f05fc..0f5ce1e 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -25,10 +25,10 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
-import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn;
-import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
index ebd5eea..5866a4c 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -28,14 +28,14 @@ import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.accumulation.Count;
+import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Count;
-import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn;
-import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
-import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.Option;
[2/2] apex-malhar git commit: APEXMALHAR-2202 Moved accumulations to
org.apache.apex.malhar.lib.window.accumulation.
Posted by th...@apache.org.
APEXMALHAR-2202 Moved accumulations to org.apache.apex.malhar.lib.window.accumulation.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2b2d5bca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2b2d5bca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2b2d5bca
Branch: refs/heads/master
Commit: 2b2d5bca91b23569e04cbc51a541126588eaab44
Parents: dcca775
Author: Shunxin <lu...@hotmail.com>
Authored: Thu Aug 25 13:37:46 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Thu Aug 25 13:37:46 2016 -0700
----------------------------------------------------------------------
.../sample/complete/TopWikipediaSessions.java | 2 +-
.../stream/sample/complete/TrafficRoutes.java | 2 +-
.../sample/cookbook/CombinePerKeyExamples.java | 2 +-
.../stream/sample/cookbook/DeDupExample.java | 2 +-
.../sample/cookbook/MaxPerKeyExamples.java | 2 +-
.../malhar/lib/window/accumulation/Average.java | 64 +++++++++
.../malhar/lib/window/accumulation/Count.java | 61 +++++++++
.../malhar/lib/window/accumulation/FoldFn.java | 65 ++++++++++
.../malhar/lib/window/accumulation/Group.java | 63 +++++++++
.../malhar/lib/window/accumulation/Max.java | 75 +++++++++++
.../malhar/lib/window/accumulation/Min.java | 76 +++++++++++
.../lib/window/accumulation/ReduceFn.java | 65 ++++++++++
.../window/accumulation/RemoveDuplicates.java | 72 +++++++++++
.../lib/window/accumulation/SumDouble.java | 60 +++++++++
.../lib/window/accumulation/SumFloat.java | 60 +++++++++
.../malhar/lib/window/accumulation/SumInt.java | 60 +++++++++
.../malhar/lib/window/accumulation/SumLong.java | 60 +++++++++
.../malhar/lib/window/accumulation/TopN.java | 106 +++++++++++++++
.../lib/window/accumulation/TopNByKey.java | 114 ++++++++++++++++
.../lib/window/impl/accumulation/Average.java | 64 ---------
.../lib/window/impl/accumulation/Count.java | 61 ---------
.../lib/window/impl/accumulation/FoldFn.java | 65 ----------
.../lib/window/impl/accumulation/Group.java | 63 ---------
.../lib/window/impl/accumulation/Max.java | 75 -----------
.../lib/window/impl/accumulation/Min.java | 76 -----------
.../lib/window/impl/accumulation/ReduceFn.java | 65 ----------
.../impl/accumulation/RemoveDuplicates.java | 72 -----------
.../lib/window/impl/accumulation/SumDouble.java | 60 ---------
.../lib/window/impl/accumulation/SumFloat.java | 60 ---------
.../lib/window/impl/accumulation/SumInt.java | 60 ---------
.../lib/window/impl/accumulation/SumLong.java | 60 ---------
.../lib/window/impl/accumulation/TopN.java | 106 ---------------
.../lib/window/impl/accumulation/TopNByKey.java | 114 ----------------
.../lib/window/accumulation/AverageTest.java | 41 ++++++
.../lib/window/accumulation/FoldFnTest.java | 129 +++++++++++++++++++
.../lib/window/accumulation/GroupTest.java | 41 ++++++
.../malhar/lib/window/accumulation/MaxTest.java | 53 ++++++++
.../malhar/lib/window/accumulation/MinTest.java | 53 ++++++++
.../lib/window/accumulation/ReduceFnTest.java | 50 +++++++
.../accumulation/RemoveDuplicatesTest.java | 41 ++++++
.../malhar/lib/window/accumulation/SumTest.java | 57 ++++++++
.../lib/window/accumulation/TopNByKeyTest.java | 74 +++++++++++
.../window/impl/accumulation/AverageTest.java | 41 ------
.../window/impl/accumulation/FoldFnTest.java | 129 -------------------
.../lib/window/impl/accumulation/GroupTest.java | 42 ------
.../lib/window/impl/accumulation/MaxTest.java | 53 --------
.../lib/window/impl/accumulation/MinTest.java | 53 --------
.../window/impl/accumulation/ReduceFnTest.java | 50 -------
.../impl/accumulation/RemoveDuplicatesTest.java | 42 ------
.../lib/window/impl/accumulation/SumTest.java | 57 --------
.../window/impl/accumulation/TopNByKeyTest.java | 75 -----------
.../apex/malhar/stream/api/WindowedStream.java | 4 +-
.../stream/api/impl/ApexWindowedStreamImpl.java | 8 +-
53 files changed, 1551 insertions(+), 1554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index de4e590..f2e70b1 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -29,7 +29,7 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index 2cc04d1..26a2823 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -32,7 +32,7 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Group;
+import org.apache.apex.malhar.lib.window.accumulation.Group;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index ecd71ae..d88a8dc 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
index 53426f3..2930010 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -26,7 +26,7 @@ import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.RemoveDuplicates;
+import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 97b2696..02980e4 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -24,7 +24,7 @@ import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Max;
+import org.apache.apex.malhar.lib.window.accumulation.Max;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
new file mode 100644
index 0000000..c669439
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Average Accumulation
+ */
+public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double>
+{
+ @Override
+ public MutablePair<Double, Long> defaultAccumulatedValue()
+ {
+ return new MutablePair<>(0.0, 0L);
+ }
+
+ @Override
+ public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
+ {
+ accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
+ accu.setRight(accu.getRight() + 1);
+ return accu;
+ }
+
+ @Override
+ public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
+ {
+ accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
+ accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
+ accu1.setRight(accu1.getRight() + accu2.getRight());
+ return accu1;
+ }
+
+ @Override
+ public Double getOutput(MutablePair<Double, Long> accumulatedValue)
+ {
+ return accumulatedValue.getLeft();
+ }
+
+ @Override
+ public Double getRetraction(Double value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return 0.0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
new file mode 100644
index 0000000..180152b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.mutable.MutableLong;
+
+/**
+ * Count Accumulation
+ */
+public class Count implements Accumulation<Long, MutableLong, Long>
+{
+
+ @Override
+ public MutableLong defaultAccumulatedValue()
+ {
+ return new MutableLong(0);
+ }
+
+ @Override
+ public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Long getOutput(MutableLong accumulatedValue)
+ {
+ return accumulatedValue.getValue();
+ }
+
+ @Override
+ public Long getRetraction(Long value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
new file mode 100644
index 0000000..dc344b3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Fold Accumulation Adaptor class
+ */
+public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
+{
+
+ public FoldFn()
+ {
+ }
+
+ public FoldFn(OUTPUT initialVal)
+ {
+ this.initialVal = initialVal;
+ }
+
+ private OUTPUT initialVal;
+
+ @Override
+ public OUTPUT defaultAccumulatedValue()
+ {
+ return initialVal;
+ }
+
+ @Override
+ public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
+ {
+ return fold(accumulatedValue, input);
+ }
+
+ @Override
+ public OUTPUT getOutput(OUTPUT accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public OUTPUT getRetraction(OUTPUT value)
+ {
+ return null;
+ }
+
+ abstract OUTPUT fold(OUTPUT result, INPUT input);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
new file mode 100644
index 0000000..c34a8ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Group accumulation.
+ */
+public class Group<T> implements Accumulation<T, List<T>, List<T>>
+{
+ @Override
+ public List<T> defaultAccumulatedValue()
+ {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> accumulate(List<T> accumulatedValue, T input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+ {
+ accumulatedValue1.addAll(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<T> getOutput(List<T> accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return new ArrayList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
new file mode 100644
index 0000000..4164bb2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Max accumulation.
+ */
+public class Max<T> implements Accumulation<T, T, T>
+{
+
+ Comparator<T> comparator;
+
+ public void setComparator(Comparator<T> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public T defaultAccumulatedValue()
+ {
+ return null;
+ }
+
+ @Override
+ public T accumulate(T accumulatedValue, T input)
+ {
+ if (accumulatedValue == null) {
+ return input;
+ } else if (comparator != null) {
+ return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue;
+ } else if (input instanceof Comparable) {
+ return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue;
+ } else {
+ throw new RuntimeException("Tuple cannot be compared");
+ }
+ }
+
+ @Override
+ public T merge(T accumulatedValue1, T accumulatedValue2)
+ {
+ return accumulate(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public T getOutput(T accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public T getRetraction(T value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
new file mode 100644
index 0000000..0460052
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Min accumulation
+ */
+public class Min<T> implements Accumulation<T, T, T>
+{
+
+ Comparator<T> comparator;
+
+ public void setComparator(Comparator<T> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public T defaultAccumulatedValue()
+ {
+ return null;
+ }
+
+ @Override
+ public T accumulate(T accumulatedValue, T input)
+ {
+ if (accumulatedValue == null) {
+ return input;
+ } else if (comparator != null) {
+ return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue;
+ } else if (input instanceof Comparable) {
+ return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue;
+ } else {
+ throw new RuntimeException("Tuple cannot be compared");
+ }
+ }
+
+ @Override
+ public T merge(T accumulatedValue1, T accumulatedValue2)
+ {
+ return accumulate(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public T getOutput(T accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public T getRetraction(T value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
new file mode 100644
index 0000000..0c34c75
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * An easy to use reduce Accumulation
+ * @param <INPUT>
+ */
+public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
+{
+ @Override
+ public INPUT defaultAccumulatedValue()
+ {
+ return null;
+ }
+
+ @Override
+ public INPUT accumulate(INPUT accumulatedValue, INPUT input)
+ {
+ if (accumulatedValue == null) {
+ return input;
+ }
+ return reduce(accumulatedValue, input);
+ }
+
+ @Override
+ public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
+ {
+ return reduce(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public INPUT getOutput(INPUT accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public INPUT getRetraction(INPUT value)
+ {
+ return null;
+ }
+
+ public abstract INPUT reduce(INPUT input1, INPUT input2);
+
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
new file mode 100644
index 0000000..8eee868
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * RemoveDuplicates Accumulation.
+ * @param <T>
+ */
+public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
+{
+ @Override
+ public Set<T> defaultAccumulatedValue()
+ {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<T> accumulate(Set<T> accumulatedValue, T input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
+ {
+ for (T item : accumulatedValue2) {
+ accumulatedValue1.add(item);
+ }
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<T> getOutput(Set<T> accumulatedValue)
+ {
+ if (accumulatedValue == null) {
+ return new ArrayList<>();
+ } else {
+ return new ArrayList<>(accumulatedValue);
+ }
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return new ArrayList<>(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
new file mode 100644
index 0000000..fafff43
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableDouble;
+
+/**
+ * Sum Accumulation for doubles.
+ */
+public class SumDouble implements Accumulation<Double, MutableDouble, Double>
+{
+ @Override
+ public MutableDouble defaultAccumulatedValue()
+ {
+ return new MutableDouble(0.0);
+ }
+
+ @Override
+ public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Double getOutput(MutableDouble accumulatedValue)
+ {
+ return accumulatedValue.doubleValue();
+ }
+
+ @Override
+ public Double getRetraction(Double value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
new file mode 100644
index 0000000..2e2d3ef
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableFloat;
+
+/**
+ * Sum Accumulation for floats.
+ */
+public class SumFloat implements Accumulation<Float, MutableFloat, Float>
+{
+ @Override
+ public MutableFloat defaultAccumulatedValue()
+ {
+ return new MutableFloat(0.);
+ }
+
+ @Override
+ public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Float getOutput(MutableFloat accumulatedValue)
+ {
+ return accumulatedValue.floatValue();
+ }
+
+ @Override
+ public Float getRetraction(Float value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
new file mode 100644
index 0000000..c9aa4bd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableInt;
+
+/**
+ * Sum accumulation for integers.
+ */
+public class SumInt implements Accumulation<Integer, MutableInt, Integer>
+{
+ @Override
+ public MutableInt defaultAccumulatedValue()
+ {
+ return new MutableInt(0);
+ }
+
+ @Override
+ public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Integer getOutput(MutableInt accumulatedValue)
+ {
+ return accumulatedValue.intValue();
+ }
+
+ @Override
+ public Integer getRetraction(Integer value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
new file mode 100644
index 0000000..1268141
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Sum accumulation for longs.
+ */
+public class SumLong implements Accumulation<Long, MutableLong, Long>
+{
+ @Override
+ public MutableLong defaultAccumulatedValue()
+ {
+ return new MutableLong(0L);
+ }
+
+ @Override
+ public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Long getOutput(MutableLong accumulatedValue)
+ {
+ return accumulatedValue.longValue();
+ }
+
+ @Override
+ public Long getRetraction(Long value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
new file mode 100644
index 0000000..c95cbb4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * TopN accumulation
+ */
+public class TopN<T> implements Accumulation<T, List<T>, List<T>>
+{
+ int n;
+
+ Comparator<T> comparator;
+
+ public void setN(int n)
+ {
+ this.n = n;
+ }
+
+ public void setComparator(Comparator<T> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public List<T> defaultAccumulatedValue()
+ {
+ return new LinkedList<>();
+ }
+
+ @Override
+ public List<T> accumulate(List<T> accumulatedValue, T input)
+ {
+ int k = 0;
+ for (T inMemory : accumulatedValue) {
+ if (comparator != null) {
+ if (comparator.compare(inMemory, input) < 0) {
+ break;
+ }
+ } else if (input instanceof Comparable) {
+ if (((Comparable<T>)input).compareTo(inMemory) > 0) {
+ break;
+ }
+ } else {
+ throw new RuntimeException("Tuple cannot be compared");
+ }
+ k++;
+ }
+ accumulatedValue.add(k, input);
+ if (accumulatedValue.size() > n) {
+ accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
+ }
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+ {
+ accumulatedValue1.addAll(accumulatedValue2);
+ if (comparator != null) {
+ Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
+ } else {
+ Collections.sort(accumulatedValue1, Collections.reverseOrder());
+ }
+ if (accumulatedValue1.size() > n) {
+ return accumulatedValue1.subList(0, n);
+ } else {
+ return accumulatedValue1;
+ }
+ }
+
+ @Override
+ public List<T> getOutput(List<T> accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> accumulatedValue)
+ {
+ return new LinkedList<>();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
new file mode 100644
index 0000000..92a6eb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generalized TopNByKey accumulation
+ */
+public class TopNByKey<K, V> implements
+ Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>>
+{
+ int n = 10;
+
+ Comparator<V> comparator;
+
+ public void setN(int n)
+ {
+ this.n = n;
+ }
+
+ public void setComparator(Comparator<V> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public Map<K, V> defaultAccumulatedValue()
+ {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input)
+ {
+ accumulatedValue.put(input.getKey(), input.getValue());
+ return accumulatedValue;
+ }
+
+ @Override
+ public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2)
+ {
+ for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) {
+ if (!accumulatedValue1.containsKey(entry.getKey())) {
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ } else if (comparator != null) {
+ if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) {
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ }
+ } else if (entry.getValue() instanceof Comparable) {
+ if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) {
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue)
+ {
+ LinkedList<KeyValPair<K, V>> result = new LinkedList<>();
+ for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) {
+ int k = 0;
+ for (KeyValPair<K, V> inMemory : result) {
+ if (comparator != null) {
+ if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) {
+ break;
+ }
+ } else if (entry.getValue() instanceof Comparable) {
+ if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) {
+ break;
+ }
+ }
+ k++;
+ }
+ result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
+ if (result.size() > n) {
+ result.remove(result.get(result.size() - 1));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value)
+ {
+ // TODO: Need to add implementation for retraction.
+ return new LinkedList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
deleted file mode 100644
index 57db6d7..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang3.tuple.MutablePair;
-
-/**
- * Average Accumulation
- */
-public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double>
-{
- @Override
- public MutablePair<Double, Long> defaultAccumulatedValue()
- {
- return new MutablePair<>(0.0, 0L);
- }
-
- @Override
- public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
- {
- accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
- accu.setRight(accu.getRight() + 1);
- return accu;
- }
-
- @Override
- public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
- {
- accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
- accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
- accu1.setRight(accu1.getRight() + accu2.getRight());
- return accu1;
- }
-
- @Override
- public Double getOutput(MutablePair<Double, Long> accumulatedValue)
- {
- return accumulatedValue.getLeft();
- }
-
- @Override
- public Double getRetraction(Double value)
- {
- // TODO: Need to add implementation for retraction.
- return 0.0;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
deleted file mode 100644
index 2c01a0b..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang3.mutable.MutableLong;
-
-/**
- * Count Accumulation
- */
-public class Count implements Accumulation<Long, MutableLong, Long>
-{
-
- @Override
- public MutableLong defaultAccumulatedValue()
- {
- return new MutableLong(0);
- }
-
- @Override
- public MutableLong accumulate(MutableLong accumulatedValue, Long input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Long getOutput(MutableLong accumulatedValue)
- {
- return accumulatedValue.getValue();
- }
-
- @Override
- public Long getRetraction(Long value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
deleted file mode 100644
index 5716cad..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Fold Accumulation Adaptor class
- */
-public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
-{
-
- public FoldFn()
- {
- }
-
- public FoldFn(OUTPUT initialVal)
- {
- this.initialVal = initialVal;
- }
-
- private OUTPUT initialVal;
-
- @Override
- public OUTPUT defaultAccumulatedValue()
- {
- return initialVal;
- }
-
- @Override
- public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
- {
- return fold(accumulatedValue, input);
- }
-
- @Override
- public OUTPUT getOutput(OUTPUT accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public OUTPUT getRetraction(OUTPUT value)
- {
- return null;
- }
-
- abstract OUTPUT fold(OUTPUT result, INPUT input);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
deleted file mode 100644
index 632cad5..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Group accumulation.
- */
-public class Group<T> implements Accumulation<T, List<T>, List<T>>
-{
- @Override
- public List<T> defaultAccumulatedValue()
- {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> accumulate(List<T> accumulatedValue, T input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
- {
- accumulatedValue1.addAll(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public List<T> getOutput(List<T> accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public List<T> getRetraction(List<T> value)
- {
- // TODO: Need to add implementation for retraction.
- return new ArrayList<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
deleted file mode 100644
index 1002b49..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Max accumulation.
- */
-public class Max<T> implements Accumulation<T, T, T>
-{
-
- Comparator<T> comparator;
-
- public void setComparator(Comparator<T> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public T defaultAccumulatedValue()
- {
- return null;
- }
-
- @Override
- public T accumulate(T accumulatedValue, T input)
- {
- if (accumulatedValue == null) {
- return input;
- } else if (comparator != null) {
- return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue;
- } else if (input instanceof Comparable) {
- return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue;
- } else {
- throw new RuntimeException("Tuple cannot be compared");
- }
- }
-
- @Override
- public T merge(T accumulatedValue1, T accumulatedValue2)
- {
- return accumulate(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public T getOutput(T accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public T getRetraction(T value)
- {
- // TODO: Need to add implementation for retraction.
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
deleted file mode 100644
index 66248f4..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Min accumulation
- */
-public class Min<T> implements Accumulation<T, T, T>
-{
-
- Comparator<T> comparator;
-
- public void setComparator(Comparator<T> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public T defaultAccumulatedValue()
- {
- return null;
- }
-
- @Override
- public T accumulate(T accumulatedValue, T input)
- {
- if (accumulatedValue == null) {
- return input;
- } else if (comparator != null) {
- return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue;
- } else if (input instanceof Comparable) {
- return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue;
- } else {
- throw new RuntimeException("Tuple cannot be compared");
- }
- }
-
- @Override
- public T merge(T accumulatedValue1, T accumulatedValue2)
- {
- return accumulate(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public T getOutput(T accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public T getRetraction(T value)
- {
- // TODO: Need to add implementation for retraction.
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
deleted file mode 100644
index c21ab32..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * An easy to use reduce Accumulation
- * @param <INPUT>
- */
-public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
-{
- @Override
- public INPUT defaultAccumulatedValue()
- {
- return null;
- }
-
- @Override
- public INPUT accumulate(INPUT accumulatedValue, INPUT input)
- {
- if (accumulatedValue == null) {
- return input;
- }
- return reduce(accumulatedValue, input);
- }
-
- @Override
- public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
- {
- return reduce(accumulatedValue1, accumulatedValue2);
- }
-
- @Override
- public INPUT getOutput(INPUT accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public INPUT getRetraction(INPUT value)
- {
- return null;
- }
-
- public abstract INPUT reduce(INPUT input1, INPUT input2);
-
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
deleted file mode 100644
index b7cd770..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * RemoveDuplicates Accumulation.
- * @param <T>
- */
-public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
-{
- @Override
- public Set<T> defaultAccumulatedValue()
- {
- return new HashSet<>();
- }
-
- @Override
- public Set<T> accumulate(Set<T> accumulatedValue, T input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
- {
- for (T item : accumulatedValue2) {
- accumulatedValue1.add(item);
- }
- return accumulatedValue1;
- }
-
- @Override
- public List<T> getOutput(Set<T> accumulatedValue)
- {
- if (accumulatedValue == null) {
- return new ArrayList<>();
- } else {
- return new ArrayList<>(accumulatedValue);
- }
- }
-
- @Override
- public List<T> getRetraction(List<T> value)
- {
- // TODO: Need to add implementation for retraction.
- return new ArrayList<>(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
deleted file mode 100644
index 60b195b..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableDouble;
-
-/**
- * Sum Accumulation for doubles.
- */
-public class SumDouble implements Accumulation<Double, MutableDouble, Double>
-{
- @Override
- public MutableDouble defaultAccumulatedValue()
- {
- return new MutableDouble(0.0);
- }
-
- @Override
- public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Double getOutput(MutableDouble accumulatedValue)
- {
- return accumulatedValue.doubleValue();
- }
-
- @Override
- public Double getRetraction(Double value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
deleted file mode 100644
index 14e69e2..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableFloat;
-
-/**
- * Sum Accumulation for floats.
- */
-public class SumFloat implements Accumulation<Float, MutableFloat, Float>
-{
- @Override
- public MutableFloat defaultAccumulatedValue()
- {
- return new MutableFloat(0.);
- }
-
- @Override
- public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Float getOutput(MutableFloat accumulatedValue)
- {
- return accumulatedValue.floatValue();
- }
-
- @Override
- public Float getRetraction(Float value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
deleted file mode 100644
index 886a7d0..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableInt;
-
-/**
- * Sum accumulation for integers.
- */
-public class SumInt implements Accumulation<Integer, MutableInt, Integer>
-{
- @Override
- public MutableInt defaultAccumulatedValue()
- {
- return new MutableInt(0);
- }
-
- @Override
- public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Integer getOutput(MutableInt accumulatedValue)
- {
- return accumulatedValue.intValue();
- }
-
- @Override
- public Integer getRetraction(Integer value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
deleted file mode 100644
index 469eef9..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableLong;
-
-/**
- * Sum accumulation for longs.
- */
-public class SumLong implements Accumulation<Long, MutableLong, Long>
-{
- @Override
- public MutableLong defaultAccumulatedValue()
- {
- return new MutableLong(0L);
- }
-
- @Override
- public MutableLong accumulate(MutableLong accumulatedValue, Long input)
- {
- accumulatedValue.add(input);
- return accumulatedValue;
- }
-
- @Override
- public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
- {
- accumulatedValue1.add(accumulatedValue2);
- return accumulatedValue1;
- }
-
- @Override
- public Long getOutput(MutableLong accumulatedValue)
- {
- return accumulatedValue.longValue();
- }
-
- @Override
- public Long getRetraction(Long value)
- {
- return -value;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
deleted file mode 100644
index 7dad8cc..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * TopN accumulation
- */
-public class TopN<T> implements Accumulation<T, List<T>, List<T>>
-{
- int n;
-
- Comparator<T> comparator;
-
- public void setN(int n)
- {
- this.n = n;
- }
-
- public void setComparator(Comparator<T> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public List<T> defaultAccumulatedValue()
- {
- return new LinkedList<>();
- }
-
- @Override
- public List<T> accumulate(List<T> accumulatedValue, T input)
- {
- int k = 0;
- for (T inMemory : accumulatedValue) {
- if (comparator != null) {
- if (comparator.compare(inMemory, input) < 0) {
- break;
- }
- } else if (input instanceof Comparable) {
- if (((Comparable<T>)input).compareTo(inMemory) > 0) {
- break;
- }
- } else {
- throw new RuntimeException("Tuple cannot be compared");
- }
- k++;
- }
- accumulatedValue.add(k, input);
- if (accumulatedValue.size() > n) {
- accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
- }
- return accumulatedValue;
- }
-
- @Override
- public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
- {
- accumulatedValue1.addAll(accumulatedValue2);
- if (comparator != null) {
- Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
- } else {
- Collections.sort(accumulatedValue1, Collections.reverseOrder());
- }
- if (accumulatedValue1.size() > n) {
- return accumulatedValue1.subList(0, n);
- } else {
- return accumulatedValue1;
- }
- }
-
- @Override
- public List<T> getOutput(List<T> accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public List<T> getRetraction(List<T> accumulatedValue)
- {
- return new LinkedList<>();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
deleted file mode 100644
index d9f9cfd..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Generalized TopNByKey accumulation
- */
-public class TopNByKey<K, V> implements
- Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>>
-{
- int n = 10;
-
- Comparator<V> comparator;
-
- public void setN(int n)
- {
- this.n = n;
- }
-
- public void setComparator(Comparator<V> comparator)
- {
- this.comparator = comparator;
- }
-
- @Override
- public Map<K, V> defaultAccumulatedValue()
- {
- return new HashMap<>();
- }
-
- @Override
- public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input)
- {
- accumulatedValue.put(input.getKey(), input.getValue());
- return accumulatedValue;
- }
-
- @Override
- public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2)
- {
- for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) {
- if (!accumulatedValue1.containsKey(entry.getKey())) {
- accumulatedValue1.put(entry.getKey(), entry.getValue());
- } else if (comparator != null) {
- if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) {
- accumulatedValue1.put(entry.getKey(), entry.getValue());
- }
- } else if (entry.getValue() instanceof Comparable) {
- if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) {
- accumulatedValue1.put(entry.getKey(), entry.getValue());
- }
- }
- }
- return accumulatedValue1;
- }
-
- @Override
- public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue)
- {
- LinkedList<KeyValPair<K, V>> result = new LinkedList<>();
- for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) {
- int k = 0;
- for (KeyValPair<K, V> inMemory : result) {
- if (comparator != null) {
- if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) {
- break;
- }
- } else if (entry.getValue() instanceof Comparable) {
- if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) {
- break;
- }
- }
- k++;
- }
- result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
- if (result.size() > n) {
- result.remove(result.get(result.size() - 1));
- }
- }
- return result;
- }
-
- @Override
- public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value)
- {
- // TODO: Need to add implementation for retraction.
- return new LinkedList<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
new file mode 100644
index 0000000..e5fd541
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Test for {@link Average}.
+ */
+public class AverageTest
+{
+ @Test
+ public void AverageTest()
+ {
+ Average ave = new Average();
+ MutablePair<Double, Long> accu = ave.defaultAccumulatedValue();
+
+ for (int i = 1; i <= 10; i++) {
+ accu = ave.accumulate(accu, (double)i);
+ }
+ Assert.assertTrue(5.5 == accu.getLeft());
+ }
+}