You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 23:28:13 UTC

[2/2] apex-malhar git commit: APEXMALHAR-2202 Moved accumulations to org.apache.apex.malhar.lib.window.accumulation.

APEXMALHAR-2202 Moved accumulations to org.apache.apex.malhar.lib.window.accumulation.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2b2d5bca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2b2d5bca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2b2d5bca

Branch: refs/heads/master
Commit: 2b2d5bca91b23569e04cbc51a541126588eaab44
Parents: dcca775
Author: Shunxin <lu...@hotmail.com>
Authored: Thu Aug 25 13:37:46 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Thu Aug 25 13:37:46 2016 -0700

----------------------------------------------------------------------
 .../sample/complete/TopWikipediaSessions.java   |   2 +-
 .../stream/sample/complete/TrafficRoutes.java   |   2 +-
 .../sample/cookbook/CombinePerKeyExamples.java  |   2 +-
 .../stream/sample/cookbook/DeDupExample.java    |   2 +-
 .../sample/cookbook/MaxPerKeyExamples.java      |   2 +-
 .../malhar/lib/window/accumulation/Average.java |  64 +++++++++
 .../malhar/lib/window/accumulation/Count.java   |  61 +++++++++
 .../malhar/lib/window/accumulation/FoldFn.java  |  65 ++++++++++
 .../malhar/lib/window/accumulation/Group.java   |  63 +++++++++
 .../malhar/lib/window/accumulation/Max.java     |  75 +++++++++++
 .../malhar/lib/window/accumulation/Min.java     |  76 +++++++++++
 .../lib/window/accumulation/ReduceFn.java       |  65 ++++++++++
 .../window/accumulation/RemoveDuplicates.java   |  72 +++++++++++
 .../lib/window/accumulation/SumDouble.java      |  60 +++++++++
 .../lib/window/accumulation/SumFloat.java       |  60 +++++++++
 .../malhar/lib/window/accumulation/SumInt.java  |  60 +++++++++
 .../malhar/lib/window/accumulation/SumLong.java |  60 +++++++++
 .../malhar/lib/window/accumulation/TopN.java    | 106 +++++++++++++++
 .../lib/window/accumulation/TopNByKey.java      | 114 ++++++++++++++++
 .../lib/window/impl/accumulation/Average.java   |  64 ---------
 .../lib/window/impl/accumulation/Count.java     |  61 ---------
 .../lib/window/impl/accumulation/FoldFn.java    |  65 ----------
 .../lib/window/impl/accumulation/Group.java     |  63 ---------
 .../lib/window/impl/accumulation/Max.java       |  75 -----------
 .../lib/window/impl/accumulation/Min.java       |  76 -----------
 .../lib/window/impl/accumulation/ReduceFn.java  |  65 ----------
 .../impl/accumulation/RemoveDuplicates.java     |  72 -----------
 .../lib/window/impl/accumulation/SumDouble.java |  60 ---------
 .../lib/window/impl/accumulation/SumFloat.java  |  60 ---------
 .../lib/window/impl/accumulation/SumInt.java    |  60 ---------
 .../lib/window/impl/accumulation/SumLong.java   |  60 ---------
 .../lib/window/impl/accumulation/TopN.java      | 106 ---------------
 .../lib/window/impl/accumulation/TopNByKey.java | 114 ----------------
 .../lib/window/accumulation/AverageTest.java    |  41 ++++++
 .../lib/window/accumulation/FoldFnTest.java     | 129 +++++++++++++++++++
 .../lib/window/accumulation/GroupTest.java      |  41 ++++++
 .../malhar/lib/window/accumulation/MaxTest.java |  53 ++++++++
 .../malhar/lib/window/accumulation/MinTest.java |  53 ++++++++
 .../lib/window/accumulation/ReduceFnTest.java   |  50 +++++++
 .../accumulation/RemoveDuplicatesTest.java      |  41 ++++++
 .../malhar/lib/window/accumulation/SumTest.java |  57 ++++++++
 .../lib/window/accumulation/TopNByKeyTest.java  |  74 +++++++++++
 .../window/impl/accumulation/AverageTest.java   |  41 ------
 .../window/impl/accumulation/FoldFnTest.java    | 129 -------------------
 .../lib/window/impl/accumulation/GroupTest.java |  42 ------
 .../lib/window/impl/accumulation/MaxTest.java   |  53 --------
 .../lib/window/impl/accumulation/MinTest.java   |  53 --------
 .../window/impl/accumulation/ReduceFnTest.java  |  50 -------
 .../impl/accumulation/RemoveDuplicatesTest.java |  42 ------
 .../lib/window/impl/accumulation/SumTest.java   |  57 --------
 .../window/impl/accumulation/TopNByKeyTest.java |  75 -----------
 .../apex/malhar/stream/api/WindowedStream.java  |   4 +-
 .../stream/api/impl/ApexWindowedStreamImpl.java |   8 +-
 53 files changed, 1551 insertions(+), 1554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index de4e590..f2e70b1 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -29,7 +29,7 @@ import org.joda.time.Duration;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index 2cc04d1..26a2823 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -32,7 +32,7 @@ import org.joda.time.Duration;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Group;
+import org.apache.apex.malhar.lib.window.accumulation.Group;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index ecd71ae..d88a8dc 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
index 53426f3..2930010 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -26,7 +26,7 @@ import org.joda.time.Duration;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.RemoveDuplicates;
+import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 97b2696..02980e4 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -24,7 +24,7 @@ import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Max;
+import org.apache.apex.malhar.lib.window.accumulation.Max;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
new file mode 100644
index 0000000..c669439
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Average Accumulation
+ */
+public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double>
+{
+  @Override
+  public MutablePair<Double, Long> defaultAccumulatedValue()
+  {
+    return new MutablePair<>(0.0, 0L);
+  }
+  
+  @Override
+  public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
+  {
+    accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
+    accu.setRight(accu.getRight() + 1);
+    return accu;
+  }
+  
+  @Override
+  public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
+  {
+    accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
+        accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
+    accu1.setRight(accu1.getRight() + accu2.getRight());
+    return accu1;
+  }
+  
+  @Override
+  public Double getOutput(MutablePair<Double, Long> accumulatedValue)
+  {
+    return accumulatedValue.getLeft();
+  }
+  
+  @Override
+  public Double getRetraction(Double value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return 0.0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
new file mode 100644
index 0000000..180152b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.mutable.MutableLong;
+
+/**
+ * Count Accumulation
+ */
+public class Count implements Accumulation<Long, MutableLong, Long>
+{
+
+  @Override
+  public MutableLong defaultAccumulatedValue()
+  {
+    return new MutableLong(0);
+  }
+
+  @Override
+  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+
+  @Override
+  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+
+  @Override
+  public Long getOutput(MutableLong accumulatedValue)
+  {
+    return accumulatedValue.getValue();
+  }
+
+  @Override
+  public Long getRetraction(Long value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
new file mode 100644
index 0000000..dc344b3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Fold Accumulation Adaptor class
+ */
+public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
+{
+
+  public FoldFn()
+  {
+  }
+
+  public FoldFn(OUTPUT initialVal)
+  {
+    this.initialVal = initialVal;
+  }
+
+  private OUTPUT initialVal;
+
+  @Override
+  public OUTPUT defaultAccumulatedValue()
+  {
+    return initialVal;
+  }
+
+  @Override
+  public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
+  {
+    return fold(accumulatedValue, input);
+  }
+
+  @Override
+  public OUTPUT getOutput(OUTPUT accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public OUTPUT getRetraction(OUTPUT value)
+  {
+    return null;
+  }
+
+  abstract OUTPUT fold(OUTPUT result, INPUT input);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
new file mode 100644
index 0000000..c34a8ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Group accumulation.
+ */
+public class Group<T> implements Accumulation<T, List<T>, List<T>>
+{
+  @Override
+  public List<T> defaultAccumulatedValue()
+  {
+    return new ArrayList<>();
+  }
+  
+  @Override
+  public List<T> accumulate(List<T> accumulatedValue, T input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+  {
+    accumulatedValue1.addAll(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public List<T> getOutput(List<T> accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+  
+  @Override
+  public List<T> getRetraction(List<T> value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
new file mode 100644
index 0000000..4164bb2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Max accumulation.
+ */
+public class Max<T> implements Accumulation<T, T, T>
+{
+  
+  Comparator<T> comparator;
+  
+  public void setComparator(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+  }
+  
+  @Override
+  public T defaultAccumulatedValue()
+  {
+    return null;
+  }
+  
+  @Override
+  public T accumulate(T accumulatedValue, T input)
+  {
+    if (accumulatedValue == null) {
+      return input;
+    } else if (comparator != null) {
+      return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue;
+    } else if (input instanceof Comparable) {
+      return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue;
+    } else {
+      throw new RuntimeException("Tuple cannot be compared");
+    }
+  }
+  
+  @Override
+  public T merge(T accumulatedValue1, T accumulatedValue2)
+  {
+    return accumulate(accumulatedValue1, accumulatedValue2);
+  }
+  
+  @Override
+  public T getOutput(T accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+  
+  @Override
+  public T getRetraction(T value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
new file mode 100644
index 0000000..0460052
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Min accumulation
+ */
+public class Min<T> implements Accumulation<T, T, T>
+{
+  
+  Comparator<T> comparator;
+  
+  public void setComparator(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+  }
+  
+  @Override
+  public T defaultAccumulatedValue()
+  {
+    return null;
+  }
+  
+  @Override
+  public T accumulate(T accumulatedValue, T input)
+  {
+    if (accumulatedValue == null) {
+      return input;
+    } else if (comparator != null) {
+      return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue;
+    } else if (input instanceof Comparable) {
+      return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue;
+    } else {
+      throw new RuntimeException("Tuple cannot be compared");
+    }
+  }
+  
+  @Override
+  public T merge(T accumulatedValue1, T accumulatedValue2)
+  {
+    return accumulate(accumulatedValue1, accumulatedValue2);
+  }
+  
+  @Override
+  public T getOutput(T accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+  
+  @Override
+  public T getRetraction(T value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
new file mode 100644
index 0000000..0c34c75
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * An easy to use reduce Accumulation
+ * @param <INPUT>
+ */
+public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
+{
+  @Override
+  public INPUT defaultAccumulatedValue()
+  {
+    return null;
+  }
+
+  @Override
+  public INPUT accumulate(INPUT accumulatedValue, INPUT input)
+  {
+    if (accumulatedValue == null) {
+      return input;
+    }
+    return reduce(accumulatedValue, input);
+  }
+
+  @Override
+  public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
+  {
+    return reduce(accumulatedValue1, accumulatedValue2);
+  }
+
+  @Override
+  public INPUT getOutput(INPUT accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public INPUT getRetraction(INPUT value)
+  {
+    return null;
+  }
+
+  public abstract INPUT reduce(INPUT input1, INPUT input2);
+
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
new file mode 100644
index 0000000..8eee868
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * RemoveDuplicates Accumulation.
+ * @param <T>
+ */
+public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
+{
+  @Override
+  public Set<T> defaultAccumulatedValue()
+  {
+    return new HashSet<>();
+  }
+  
+  @Override
+  public Set<T> accumulate(Set<T> accumulatedValue, T input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
+  {
+    for (T item : accumulatedValue2) {
+      accumulatedValue1.add(item);
+    }
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public List<T> getOutput(Set<T> accumulatedValue)
+  {
+    if (accumulatedValue == null) {
+      return new ArrayList<>();
+    } else {
+      return new ArrayList<>(accumulatedValue);
+    }
+  }
+  
+  @Override
+  public List<T> getRetraction(List<T> value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return new ArrayList<>(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
new file mode 100644
index 0000000..fafff43
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableDouble;
+
+/**
+ * Sum Accumulation for doubles.
+ */
+public class SumDouble implements Accumulation<Double, MutableDouble, Double>
+{
+  @Override
+  public MutableDouble defaultAccumulatedValue()
+  {
+    return new MutableDouble(0.0);
+  }
+  
+  @Override
+  public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Double getOutput(MutableDouble accumulatedValue)
+  {
+    return accumulatedValue.doubleValue();
+  }
+  
+  @Override
+  public Double getRetraction(Double value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
new file mode 100644
index 0000000..2e2d3ef
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableFloat;
+
+/**
+ * Sum Accumulation for floats.
+ */
+public class SumFloat implements Accumulation<Float, MutableFloat, Float>
+{
+  @Override
+  public MutableFloat defaultAccumulatedValue()
+  {
+    return new MutableFloat(0.);
+  }
+  
+  @Override
+  public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Float getOutput(MutableFloat accumulatedValue)
+  {
+    return accumulatedValue.floatValue();
+  }
+  
+  @Override
+  public Float getRetraction(Float value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
new file mode 100644
index 0000000..c9aa4bd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableInt;
+
+/**
+ * Sum accumulation for integers.
+ */
+public class SumInt implements Accumulation<Integer, MutableInt, Integer>
+{
+  @Override
+  public MutableInt defaultAccumulatedValue()
+  {
+    return new MutableInt(0);
+  }
+  
+  @Override
+  public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Integer getOutput(MutableInt accumulatedValue)
+  {
+    return accumulatedValue.intValue();
+  }
+  
+  @Override
+  public Integer getRetraction(Integer value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
new file mode 100644
index 0000000..1268141
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Sum accumulation for longs.
+ */
+public class SumLong implements Accumulation<Long, MutableLong, Long>
+{
+  @Override
+  public MutableLong defaultAccumulatedValue()
+  {
+    return new MutableLong(0L);
+  }
+  
+  @Override
+  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+  
+  @Override
+  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+  
+  @Override
+  public Long getOutput(MutableLong accumulatedValue)
+  {
+    return accumulatedValue.longValue();
+  }
+  
+  @Override
+  public Long getRetraction(Long value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
new file mode 100644
index 0000000..c95cbb4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * TopN accumulation
+ */
+public class TopN<T> implements Accumulation<T, List<T>, List<T>>
+{
+  int n;
+
+  Comparator<T> comparator;
+
+  public void setN(int n)
+  {
+    this.n = n;
+  }
+
+  public void setComparator(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+  }
+
+  @Override
+  public List<T> defaultAccumulatedValue()
+  {
+    return new LinkedList<>();
+  }
+
+  @Override
+  public List<T> accumulate(List<T> accumulatedValue, T input)
+  {
+    int k = 0;
+    for (T inMemory : accumulatedValue) {
+      if (comparator != null) {
+        if (comparator.compare(inMemory, input) < 0) {
+          break;
+        }
+      } else if (input instanceof Comparable) {
+        if (((Comparable<T>)input).compareTo(inMemory) > 0) {
+          break;
+        }
+      } else {
+        throw new RuntimeException("Tuple cannot be compared");
+      }
+      k++;
+    }
+    accumulatedValue.add(k, input);
+    if (accumulatedValue.size() > n) {
+      accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
+    }
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+  {
+    accumulatedValue1.addAll(accumulatedValue2);
+    if (comparator != null) {
+      Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
+    } else {
+      Collections.sort(accumulatedValue1, Collections.reverseOrder());
+    }
+    if (accumulatedValue1.size() > n) {
+      return accumulatedValue1.subList(0, n);
+    } else {
+      return accumulatedValue1;
+    }
+  }
+
+  @Override
+  public List<T> getOutput(List<T> accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> getRetraction(List<T> accumulatedValue)
+  {
+    return new LinkedList<>();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
new file mode 100644
index 0000000..92a6eb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generalized TopNByKey accumulation
+ */
+public class TopNByKey<K, V> implements
+    Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>>
+{
+  int n = 10;
+
+  Comparator<V> comparator;
+
+  public void setN(int n)
+  {
+    this.n = n;
+  }
+
+  public void setComparator(Comparator<V> comparator)
+  {
+    this.comparator = comparator;
+  }
+
+  @Override
+  public Map<K, V> defaultAccumulatedValue()
+  {
+    return new HashMap<>();
+  }
+
+  @Override
+  public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input)
+  {
+    accumulatedValue.put(input.getKey(), input.getValue());
+    return accumulatedValue;
+  }
+
+  @Override
+  public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2)
+  {
+    for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) {
+      if (!accumulatedValue1.containsKey(entry.getKey())) {
+        accumulatedValue1.put(entry.getKey(), entry.getValue());
+      } else if (comparator != null) {
+        if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) {
+          accumulatedValue1.put(entry.getKey(), entry.getValue());
+        }
+      } else if (entry.getValue() instanceof Comparable) {
+        if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) {
+          accumulatedValue1.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+    return accumulatedValue1;
+  }
+
+  @Override
+  public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue)
+  {
+    LinkedList<KeyValPair<K, V>> result = new LinkedList<>();
+    for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) {
+      int k = 0;
+      for (KeyValPair<K, V> inMemory : result) {
+        if (comparator != null) {
+          if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) {
+            break;
+          }
+        } else if (entry.getValue() instanceof Comparable) {
+          if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) {
+            break;
+          }
+        }
+        k++;
+      }
+      result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
+      if (result.size() > n) {
+        result.remove(result.get(result.size() - 1));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value)
+  {
+    // TODO: Need to add implementation for retraction.
+    return new LinkedList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
deleted file mode 100644
index 57db6d7..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang3.tuple.MutablePair;
-
-/**
- * Average Accumulation
- */
-public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double>
-{
-  @Override
-  public MutablePair<Double, Long> defaultAccumulatedValue()
-  {
-    return new MutablePair<>(0.0, 0L);
-  }
-  
-  @Override
-  public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input)
-  {
-    accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1));
-    accu.setRight(accu.getRight() + 1);
-    return accu;
-  }
-  
-  @Override
-  public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2)
-  {
-    accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) +
-        accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight()));
-    accu1.setRight(accu1.getRight() + accu2.getRight());
-    return accu1;
-  }
-  
-  @Override
-  public Double getOutput(MutablePair<Double, Long> accumulatedValue)
-  {
-    return accumulatedValue.getLeft();
-  }
-  
-  @Override
-  public Double getRetraction(Double value)
-  {
-    // TODO: Need to add implementation for retraction.
-    return 0.0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
deleted file mode 100644
index 2c01a0b..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang3.mutable.MutableLong;
-
-/**
- * Count Accumulation
- */
-public class Count implements Accumulation<Long, MutableLong, Long>
-{
-
-  @Override
-  public MutableLong defaultAccumulatedValue()
-  {
-    return new MutableLong(0);
-  }
-
-  @Override
-  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-
-  @Override
-  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
-  {
-    accumulatedValue1.add(accumulatedValue2);
-    return accumulatedValue1;
-  }
-
-  @Override
-  public Long getOutput(MutableLong accumulatedValue)
-  {
-    return accumulatedValue.getValue();
-  }
-
-  @Override
-  public Long getRetraction(Long value)
-  {
-    return -value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
deleted file mode 100644
index 5716cad..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Fold Accumulation Adaptor class
- */
-public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
-{
-
-  public FoldFn()
-  {
-  }
-
-  public FoldFn(OUTPUT initialVal)
-  {
-    this.initialVal = initialVal;
-  }
-
-  private OUTPUT initialVal;
-
-  @Override
-  public OUTPUT defaultAccumulatedValue()
-  {
-    return initialVal;
-  }
-
-  @Override
-  public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
-  {
-    return fold(accumulatedValue, input);
-  }
-
-  @Override
-  public OUTPUT getOutput(OUTPUT accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-
-  @Override
-  public OUTPUT getRetraction(OUTPUT value)
-  {
-    return null;
-  }
-
-  abstract OUTPUT fold(OUTPUT result, INPUT input);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
deleted file mode 100644
index 632cad5..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Group accumulation.
- */
-public class Group<T> implements Accumulation<T, List<T>, List<T>>
-{
-  @Override
-  public List<T> defaultAccumulatedValue()
-  {
-    return new ArrayList<>();
-  }
-  
-  @Override
-  public List<T> accumulate(List<T> accumulatedValue, T input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-  
-  @Override
-  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
-  {
-    accumulatedValue1.addAll(accumulatedValue2);
-    return accumulatedValue1;
-  }
-  
-  @Override
-  public List<T> getOutput(List<T> accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-  
-  @Override
-  public List<T> getRetraction(List<T> value)
-  {
-    // TODO: Need to add implementation for retraction.
-    return new ArrayList<>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
deleted file mode 100644
index 1002b49..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Max accumulation.
- */
-public class Max<T> implements Accumulation<T, T, T>
-{
-  
-  Comparator<T> comparator;
-  
-  public void setComparator(Comparator<T> comparator)
-  {
-    this.comparator = comparator;
-  }
-  
-  @Override
-  public T defaultAccumulatedValue()
-  {
-    return null;
-  }
-  
-  @Override
-  public T accumulate(T accumulatedValue, T input)
-  {
-    if (accumulatedValue == null) {
-      return input;
-    } else if (comparator != null) {
-      return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue;
-    } else if (input instanceof Comparable) {
-      return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue;
-    } else {
-      throw new RuntimeException("Tuple cannot be compared");
-    }
-  }
-  
-  @Override
-  public T merge(T accumulatedValue1, T accumulatedValue2)
-  {
-    return accumulate(accumulatedValue1, accumulatedValue2);
-  }
-  
-  @Override
-  public T getOutput(T accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-  
-  @Override
-  public T getRetraction(T value)
-  {
-    // TODO: Need to add implementation for retraction.
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
deleted file mode 100644
index 66248f4..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Min accumulation
- */
-public class Min<T> implements Accumulation<T, T, T>
-{
-  
-  Comparator<T> comparator;
-  
-  public void setComparator(Comparator<T> comparator)
-  {
-    this.comparator = comparator;
-  }
-  
-  @Override
-  public T defaultAccumulatedValue()
-  {
-    return null;
-  }
-  
-  @Override
-  public T accumulate(T accumulatedValue, T input)
-  {
-    if (accumulatedValue == null) {
-      return input;
-    } else if (comparator != null) {
-      return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue;
-    } else if (input instanceof Comparable) {
-      return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue;
-    } else {
-      throw new RuntimeException("Tuple cannot be compared");
-    }
-  }
-  
-  @Override
-  public T merge(T accumulatedValue1, T accumulatedValue2)
-  {
-    return accumulate(accumulatedValue1, accumulatedValue2);
-  }
-  
-  @Override
-  public T getOutput(T accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-  
-  @Override
-  public T getRetraction(T value)
-  {
-    // TODO: Need to add implementation for retraction.
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
deleted file mode 100644
index c21ab32..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * An easy to use reduce Accumulation
- * @param <INPUT>
- */
-public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
-{
-  @Override
-  public INPUT defaultAccumulatedValue()
-  {
-    return null;
-  }
-
-  @Override
-  public INPUT accumulate(INPUT accumulatedValue, INPUT input)
-  {
-    if (accumulatedValue == null) {
-      return input;
-    }
-    return reduce(accumulatedValue, input);
-  }
-
-  @Override
-  public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
-  {
-    return reduce(accumulatedValue1, accumulatedValue2);
-  }
-
-  @Override
-  public INPUT getOutput(INPUT accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-
-  @Override
-  public INPUT getRetraction(INPUT value)
-  {
-    return null;
-  }
-
-  public abstract INPUT reduce(INPUT input1, INPUT input2);
-
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
deleted file mode 100644
index b7cd770..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * RemoveDuplicates Accumulation.
- * @param <T>
- */
-public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>>
-{
-  @Override
-  public Set<T> defaultAccumulatedValue()
-  {
-    return new HashSet<>();
-  }
-  
-  @Override
-  public Set<T> accumulate(Set<T> accumulatedValue, T input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-  
-  @Override
-  public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2)
-  {
-    for (T item : accumulatedValue2) {
-      accumulatedValue1.add(item);
-    }
-    return accumulatedValue1;
-  }
-  
-  @Override
-  public List<T> getOutput(Set<T> accumulatedValue)
-  {
-    if (accumulatedValue == null) {
-      return new ArrayList<>();
-    } else {
-      return new ArrayList<>(accumulatedValue);
-    }
-  }
-  
-  @Override
-  public List<T> getRetraction(List<T> value)
-  {
-    // TODO: Need to add implementation for retraction.
-    return new ArrayList<>(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
deleted file mode 100644
index 60b195b..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableDouble;
-
-/**
- * Sum Accumulation for doubles.
- */
-public class SumDouble implements Accumulation<Double, MutableDouble, Double>
-{
-  @Override
-  public MutableDouble defaultAccumulatedValue()
-  {
-    return new MutableDouble(0.0);
-  }
-  
-  @Override
-  public MutableDouble accumulate(MutableDouble accumulatedValue, Double input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-  
-  @Override
-  public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2)
-  {
-    accumulatedValue1.add(accumulatedValue2);
-    return accumulatedValue1;
-  }
-  
-  @Override
-  public Double getOutput(MutableDouble accumulatedValue)
-  {
-    return accumulatedValue.doubleValue();
-  }
-  
-  @Override
-  public Double getRetraction(Double value)
-  {
-    return -value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
deleted file mode 100644
index 14e69e2..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableFloat;
-
-/**
- * Sum Accumulation for floats.
- */
-public class SumFloat implements Accumulation<Float, MutableFloat, Float>
-{
-  @Override
-  public MutableFloat defaultAccumulatedValue()
-  {
-    return new MutableFloat(0.);
-  }
-  
-  @Override
-  public MutableFloat accumulate(MutableFloat accumulatedValue, Float input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-  
-  @Override
-  public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2)
-  {
-    accumulatedValue1.add(accumulatedValue2);
-    return accumulatedValue1;
-  }
-  
-  @Override
-  public Float getOutput(MutableFloat accumulatedValue)
-  {
-    return accumulatedValue.floatValue();
-  }
-  
-  @Override
-  public Float getRetraction(Float value)
-  {
-    return -value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
deleted file mode 100644
index 886a7d0..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableInt;
-
-/**
- * Sum accumulation for integers.
- */
-public class SumInt implements Accumulation<Integer, MutableInt, Integer>
-{
-  @Override
-  public MutableInt defaultAccumulatedValue()
-  {
-    return new MutableInt(0);
-  }
-  
-  @Override
-  public MutableInt accumulate(MutableInt accumulatedValue, Integer input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-  
-  @Override
-  public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2)
-  {
-    accumulatedValue1.add(accumulatedValue2);
-    return accumulatedValue1;
-  }
-  
-  @Override
-  public Integer getOutput(MutableInt accumulatedValue)
-  {
-    return accumulatedValue.intValue();
-  }
-  
-  @Override
-  public Integer getRetraction(Integer value)
-  {
-    return -value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
deleted file mode 100644
index 469eef9..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableLong;
-
-/**
- * Sum accumulation for longs.
- */
-public class SumLong implements Accumulation<Long, MutableLong, Long>
-{
-  @Override
-  public MutableLong defaultAccumulatedValue()
-  {
-    return new MutableLong(0L);
-  }
-  
-  @Override
-  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
-  {
-    accumulatedValue.add(input);
-    return accumulatedValue;
-  }
-  
-  @Override
-  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
-  {
-    accumulatedValue1.add(accumulatedValue2);
-    return accumulatedValue1;
-  }
-  
-  @Override
-  public Long getOutput(MutableLong accumulatedValue)
-  {
-    return accumulatedValue.longValue();
-  }
-  
-  @Override
-  public Long getRetraction(Long value)
-  {
-    return -value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
deleted file mode 100644
index 7dad8cc..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * TopN accumulation
- */
-public class TopN<T> implements Accumulation<T, List<T>, List<T>>
-{
-  int n;
-
-  Comparator<T> comparator;
-
-  public void setN(int n)
-  {
-    this.n = n;
-  }
-
-  public void setComparator(Comparator<T> comparator)
-  {
-    this.comparator = comparator;
-  }
-
-  @Override
-  public List<T> defaultAccumulatedValue()
-  {
-    return new LinkedList<>();
-  }
-
-  @Override
-  public List<T> accumulate(List<T> accumulatedValue, T input)
-  {
-    int k = 0;
-    for (T inMemory : accumulatedValue) {
-      if (comparator != null) {
-        if (comparator.compare(inMemory, input) < 0) {
-          break;
-        }
-      } else if (input instanceof Comparable) {
-        if (((Comparable<T>)input).compareTo(inMemory) > 0) {
-          break;
-        }
-      } else {
-        throw new RuntimeException("Tuple cannot be compared");
-      }
-      k++;
-    }
-    accumulatedValue.add(k, input);
-    if (accumulatedValue.size() > n) {
-      accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
-    }
-    return accumulatedValue;
-  }
-
-  @Override
-  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
-  {
-    accumulatedValue1.addAll(accumulatedValue2);
-    if (comparator != null) {
-      Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
-    } else {
-      Collections.sort(accumulatedValue1, Collections.reverseOrder());
-    }
-    if (accumulatedValue1.size() > n) {
-      return accumulatedValue1.subList(0, n);
-    } else {
-      return accumulatedValue1;
-    }
-  }
-
-  @Override
-  public List<T> getOutput(List<T> accumulatedValue)
-  {
-    return accumulatedValue;
-  }
-
-  @Override
-  public List<T> getRetraction(List<T> accumulatedValue)
-  {
-    return new LinkedList<>();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
deleted file mode 100644
index d9f9cfd..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Generalized TopNByKey accumulation
- */
-public class TopNByKey<K, V> implements
-    Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>>
-{
-  int n = 10;
-
-  Comparator<V> comparator;
-
-  public void setN(int n)
-  {
-    this.n = n;
-  }
-
-  public void setComparator(Comparator<V> comparator)
-  {
-    this.comparator = comparator;
-  }
-
-  @Override
-  public Map<K, V> defaultAccumulatedValue()
-  {
-    return new HashMap<>();
-  }
-
-  @Override
-  public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input)
-  {
-    accumulatedValue.put(input.getKey(), input.getValue());
-    return accumulatedValue;
-  }
-
-  @Override
-  public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2)
-  {
-    for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) {
-      if (!accumulatedValue1.containsKey(entry.getKey())) {
-        accumulatedValue1.put(entry.getKey(), entry.getValue());
-      } else if (comparator != null) {
-        if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) {
-          accumulatedValue1.put(entry.getKey(), entry.getValue());
-        }
-      } else if (entry.getValue() instanceof Comparable) {
-        if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) {
-          accumulatedValue1.put(entry.getKey(), entry.getValue());
-        }
-      }
-    }
-    return accumulatedValue1;
-  }
-
-  @Override
-  public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue)
-  {
-    LinkedList<KeyValPair<K, V>> result = new LinkedList<>();
-    for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) {
-      int k = 0;
-      for (KeyValPair<K, V> inMemory : result) {
-        if (comparator != null) {
-          if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) {
-            break;
-          }
-        } else if (entry.getValue() instanceof Comparable) {
-          if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) {
-            break;
-          }
-        }
-        k++;
-      }
-      result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
-      if (result.size() > n) {
-        result.remove(result.get(result.size() - 1));
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value)
-  {
-    // TODO: Need to add implementation for retraction.
-    return new LinkedList<>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
new file mode 100644
index 0000000..e5fd541
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+/**
+ * Test for {@link Average}.
+ */
+public class AverageTest
+{
+  @Test
+  public void AverageTest()
+  {
+    Average ave = new Average();
+    MutablePair<Double, Long> accu = ave.defaultAccumulatedValue();
+    
+    for (int i = 1; i <= 10; i++) {
+      accu = ave.accumulate(accu, (double)i);
+    }
+    Assert.assertTrue(5.5 == accu.getLeft());
+  }
+}