You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:55 UTC

[11/12] apex-malhar git commit: Updated algo & working on math operators

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java
new file mode 100644
index 0000000..78eb6d9
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMap.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.lib.util.AbstractBaseFrequentKeyValueMap;
+
+/**
+ * This operator filters the incoming stream of key value pairs by finding the value or values (if there is a tie),
+ * for each key, that occur the fewest number of times within each window. 
+ * Each key and its corresponding least values are emitted at the end of each window.
+ * <p>
+ * Occurrences of all values for each key is counted and at the end of window the least frequent values are emitted on output port least per key.
+ * </p>
+ * <p>
+ * This module is an end of window module<br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V&gt;<br>
+ * <b>least</b>: Output port, emits HashMap&lt;K,HashMap&lt;V,Integer&gt;&gt;(1)<br>
+ * <br>
+ * <b>Properties</b>: None<br>
+ * <br>
+ * <b>Compile time checks</b>: None<br>
+ * <b>Specific run time checks</b>: None <br>
+ * <br>
+ * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for LeastFrequentKeyValueMap&lt;K,V&gt; operator template">
+ * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
+ * <tr><td><b>&gt; 30 Million K,V pairs/s</b></td><td>Emits only 1 tuple per window per key</td><td>In-bound throughput is the main determinant of performance.
+ * The benchmark was done with immutable objects. If K or V are mutable the benchmark may be lower</td></tr>
+ * </table><br>
+ * </p>
+ * <p>
+ * <b>Function Table (K=String,V=Integer);</b>:
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for LeastFrequentKeyValueMap&lt;K,V&gt; operator template">
+ * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr>
+ * <tr><th><i>data</i>(Map&lt;K,V&gt;)</th><th><i>least</i>(HashMap&lt;K,HashMap&lt;Integer&gt;&gt;)</th></tr>
+ * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr>
+ * <tr><td>Data (process())</td><td>{a=1,b=5,c=110}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=55,c=2000,b=45}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{d=2}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=55,b=5,c=22}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{h=20,a=2,z=5}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=4,c=110}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=4,z=5}</td><td></td></tr>
+ * <tr><td>End Window (endWindow())</td><td>N/A</td><td>{a={1=1,2=1},b={45=1},c={2000=1,22=1},d={2=1},h={20=1},z={5=2}</td></tr>
+ * </table>
+ * <br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit Least Frequent Keyval Pair
+ * @category Rules and Alerts
+ * @tags filter, key value, count
+ * @deprecated
+ * @since 0.3.2
+ */
+@Deprecated
+public class LeastFrequentKeyValueMap<K, V> extends AbstractBaseFrequentKeyValueMap<K, V>
+{
+  /**
+   * The output port on which the least frequent key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, HashMap<V, Integer>>> least = new DefaultOutputPort<HashMap<K, HashMap<V, Integer>>>();
+
+  /**
+   * returns val1 < val2
+   * @param val1
+   * @param val2
+   * @return val1 < val2
+   */
+  @Override
+  public boolean compareValue(int val1, int val2)
+  {
+    return (val1 < val2);
+  }
+
+  /**
+   * Emits tuple on port "least"
+   * @param tuple
+   */
+  @Override
+  public void emitTuple(HashMap<K, HashMap<V, Integer>> tuple)
+  {
+    least.emit(tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java
new file mode 100644
index 0000000..f1ab968
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMap.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.AbstractBaseFrequentKey;
+import com.datatorrent.lib.util.UnifierArrayHashMapFrequent;
+import com.datatorrent.lib.util.UnifierHashMapFrequent;
+
+/**
+ * This operator filters the incoming stream of key value pairs by finding the key or keys (if there is a tie)
+ * that occur the largest number of times within each window.&nbsp;
+ * A list of the corresponding key value pairs are then output to the port named "list" and one of the corresponding key value pairs is output to the port "most", at the end of each window.
+ * <p>
+ * Occurrences of each key is counted and at the end of window any of the most frequent key is emitted on output port least and all least frequent
+ * keys on output port list.
+ * </p>
+ * <p>
+ * This module is an end of window module. In case of a tie any of the least key would be emitted. The list port would however have all the tied keys<br>
+ * <br>
+ *  <b>StateFull : Yes</b>, Values are compared all over  application window can be > 1. <br>
+ *  <b>Partitions : Yes</b>, Result is unified on output port. <br>
+ *  <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V&gt;, V is ignored/not used<br>
+ * <b>most</b>: emits HashMap&lt;K,Integer&gt;(1); where String is the least frequent key, and Integer is the number of its occurrences in the window<br>
+ * <b>list</b>: emits ArrayList&lt;HashMap&lt;K,Integer&gt;(1)&gt;; Where the list includes all the keys are least frequent<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit Most Frequent Key
+ * @category Rules and Alerts
+ * @tags filter, key value, count
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = true)
+public class MostFrequentKeyMap<K,V> extends AbstractBaseFrequentKey<K>
+{
+  /**
+   * The input port which receives incoming key value pairs.
+   */
+  public final transient DefaultInputPort<Map<K,V>> data = new DefaultInputPort<Map<K,V>>()
+  {
+    /**
+     * Calls super.processTuple(tuple) for each key in the HashMap
+     */
+    @Override
+    public void process(Map<K,V> tuple)
+    {
+      for (Map.Entry<K, V> e: tuple.entrySet()) {
+        processTuple(e.getKey());
+      }
+    }
+  };
+  /**
+   * The output port on which all the tuples,
+   * which occurred the most number of times,
+   * is emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, Integer>> most = new DefaultOutputPort<HashMap<K, Integer>>()
+  {
+    @Override
+    public Unifier<HashMap<K, Integer>> getUnifier()
+    {
+      Unifier<HashMap<K, Integer>> ret = new UnifierHashMapFrequent<K>();
+      ((UnifierHashMapFrequent<K>)ret).setLeast(false);
+      return ret;
+    }
+  };
+
+
+  public final transient DefaultOutputPort<ArrayList<HashMap<K, Integer>>> list = new DefaultOutputPort<ArrayList<HashMap<K, Integer>>>()
+  {
+    @SuppressWarnings({"rawtypes", "ConstantConditions"})
+    @Override
+    public Unifier<ArrayList<HashMap<K, Integer>>> getUnifier()
+    {
+      Unifier<ArrayList<HashMap<K, Integer>>> ret = new UnifierArrayHashMapFrequent<K>();
+      ((UnifierHashMapFrequent)ret).setLeast(false);
+      return ret;
+    }
+  };
+
+
+  /**
+   * Emits tuple on port "most"
+   * @param tuple
+   */
+  @Override
+  public void emitTuple(HashMap<K, Integer> tuple)
+  {
+    most.emit(tuple);
+  }
+
+  /**
+   * Emits tuple on port "list"
+   * @param tlist
+   */
+  @Override
+  public void emitList(ArrayList<HashMap<K, Integer>> tlist)
+  {
+    list.emit(tlist);
+  }
+
+  /**
+   * returns val1 < val2
+   * @param val1
+   * @param val2
+   * @return val1 > val2
+   */
+  @Override
+  public boolean compareCount(int val1, int val2)
+  {
+    return val1 > val2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMap.java
new file mode 100644
index 0000000..4fb6472
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMap.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.AbstractBaseFrequentKeyValueMap;
+
+/**
+ * This operator filters the incoming stream of key value pairs by finding the value or values (if there is a tie),
+ * for each key, that occur the largest number of times within each window.&nbsp;
+ * Each key and its corresponding most values are emitted at the end of each window.
+ * <p>
+ * Occurrences of all values for each key is counted and at the end of window the most frequent values are emitted on output port least per key
+ * </p>
+ * <p>
+ * This module is an end of window module<br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects HashMap&lt;K,V&gt;<br>
+ * <b>most</b>: emits HashMap&lt;String, HashMap&lt;String, Integer&gt;&gt;(1)<br>
+ * <br>
+ * <br>
+ * <b>Properties</b>: None<br>
+ * <br>
+ * <b>Compile time checks</b>: None<br>
+ * <b>Specific run time checks</b>: None <br>
+ * <br>
+ * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for MostFrequentKeyValueMap&lt;K,V&gt; operator template">
+ * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
+ * <tr><td><b>&gt; 30 Million K,V pairs/s</b></td><td>Emits only 1 tuple per window per key</td><td>In-bound throughput is the main determinant of performance.
+ * The benchmark was done with immutable objects. If K or V are mutable the benchmark may be lower</td></tr>
+ * </table><br>
+ * </p>
+ * <p>
+ * <b>Function Table (K=String,V=Integer);</b>:
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for MostFrequentKeyValueMap&lt;K,V&gt; operator template">
+ * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr>
+ * <tr><th><i>data</i>(HashMap&lt;K,V&gt;)</th><th><i>most</i>(HashMap&lt;K,HashMap&lt;Integer&gt;&gt;)</th></tr>
+ * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr>
+ * <tr><td>Data (process())</td><td>{a=1,b=5,c=110}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=55,c=2000,b=45}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{d=2}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=55,b=5,c=22}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{h=20,a=2,z=5}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=4,c=110}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=4,z=5}</td><td></td></tr>
+ * <tr><td>End Window (endWindow())</td><td>N/A</td><td>{a={4=2,55=2},b={5=2},c={110=2},d={2=1},h={20=1},z={5=2}</td></tr>
+ * </table>
+ * <br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit Most Frequent Keyval Pair
+ * @category Rules and Alerts
+ * @tags filter, key value, count
+ * @deprecated
+ * @since 0.3.2
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class MostFrequentKeyValueMap<K, V> extends AbstractBaseFrequentKeyValueMap<K, V>
+{
+  /**
+   * The output port which emits a map from keys to their most values.
+   */
+  public final transient DefaultOutputPort<HashMap<K, HashMap<V, Integer>>> most = new DefaultOutputPort<HashMap<K, HashMap<V, Integer>>>();
+
+  /**
+   * returns val1 < val2
+   * @param val1
+   * @param val2
+   * @return val1 > val2
+   */
+  @Override
+  public boolean compareValue(int val1, int val2)
+  {
+    return (val1 > val2);
+  }
+
+  /**
+   * Emits tuple on port "most"
+   * @param tuple is emitted on port "most"
+   */
+  @Override
+  public void emitTuple(HashMap<K, HashMap<V, Integer>> tuple)
+  {
+    most.emit(tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/Sampler.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/Sampler.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/Sampler.java
new file mode 100644
index 0000000..7caf523
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/Sampler.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.Random;
+
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.lib.util.BaseKeyOperator;
+
+/**
+ * This operator takes a stream of tuples as input, and emits each tuple with a specified probability.
+ * <p>
+ * Emits the tuple as per probability of pass rate out of total rate. <br>
+ * <br>
+ * An efficient filter to allow sample analysis of a stream. Very useful is the incoming stream has high throughput.
+ * </p>
+ * <p>
+ * <br>
+ * <b> StateFull : No, </b> tuple is processed in current window. <br>
+ * <b> Partitions : Yes. </b> No state dependency among input tuples. <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects K<br>
+ * <b>sample</b>: emits K<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>passrate</b>: Sample rate out of a total of totalrate. Default is 1<br>
+ * <b>totalrate</b>: Total rate (divisor). Default is 100<br>
+ * <br>
+ * <b>Specific compile time checks are</b>: None<br>
+ * passrate is positive integer<br>
+ * totalrate is positive integer<br>
+ * passrate and totalrate are not compared (i.e. passrate &lt; totalrate) check is not done to allow users to make this operator a passthrough (all) during testing<br>
+ * <br>
+ * <b>Specific run time checks are</b>: None<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Sampler
+ * @category Stats and Aggregations
+ * @tags filter
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@Stateless
+@OperatorAnnotation(partitionable = true)
+public class Sampler<K> extends BaseKeyOperator<K>
+{
+  /**
+   * This is the input port which receives tuples.
+   */
+  public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
+  {
+    /**
+     * Emits tuples at a rate corresponding to the given samplingPercentage.
+     */
+    @Override
+    public void process(K tuple)
+    {
+      double val = random.nextDouble();
+      if (val > samplingPercentage) {
+        return;
+      }
+      sample.emit(cloneKey(tuple));
+    }
+  };
+
+  /**
+   * This is the output port which emits the sampled tuples.
+   */
+  public final transient DefaultOutputPort<K> sample = new DefaultOutputPort<K>();
+
+  @Min(0)
+  @Max(1)
+  private double samplingPercentage = 1.0;
+
+  private transient Random random = new Random();
+
+  /**
+   * Gets the samplingPercentage.
+   * @return the samplingPercentage
+   */
+  public double getSamplingPercentage()
+  {
+    return samplingPercentage;
+  }
+
+  /**
+   * The percentage of tuples to allow to pass through this operator. This percentage should be
+   * a number between 0 and 1 inclusive.
+   * @param samplingPercentage the samplingPercentage to set
+   */
+  public void setSamplingPercentage(double samplingPercentage)
+  {
+    this.samplingPercentage = samplingPercentage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java
new file mode 100644
index 0000000..146a65d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.BaseNumberValueOperator;
+
+/**
+ * Operator compares data values arriving on input port with base value input operator.
+ * 
+ * <p>
+ * Arriving base value is stored in operator for comparison, old base value is overwritten.&nbsp;
+ * This emits &lt;change in value,percentage change&gt;.
+ * Operator expects values arriving on data input port and base value input operator.
+ * Change in value and percentage change in values are emitted on separate ports.<br>
+ * This operator can not be partitioned, since copies won't get consecutive operators. <br>
+ * This is StateFull operator, tuples that arrive on base port are kept in
+ * cache forever.<br>
+ * <br>
+ * <b>Input Ports</b>:<br>
+ * <b>data</b>: expects V extends Number, Data values<br>
+ * <b>base</b>: expects V extends Number, Base Value stored for comparison<br>
+ *
+ * <b>Output Ports</b>:<br>
+ * <b>change</b>: emits V extends Number,  Diff from base value<br>
+ * <b>percent</b>: emits Doubl, percent change in value compared to base value.<br>
+ * <br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
+ * <b>filterBy</b>: List of keys to filter on<br>
+ * <br>
+ * <b>Specific compile time checks</b>: None<br>
+ * <b>Specific run time checks</b>: None<br>
+ * <br>
+ *
+ * <br>
+ * @displayName Change
+ * @category Math
+ * @tags change, key value, numeric, percentage
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class Change<V extends Number> extends BaseNumberValueOperator<V>
+{
+        /**
+   * Input data port that takes a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple?
+        double cval = tuple.doubleValue() - baseValue;
+        change.emit(getValue(cval));
+        percent.emit((cval / baseValue) * 100);
+      }
+    }
+  };
+        
+        /**
+   * Input port that takes a number&nbsp; It stores the value for base comparison.
+   */
+  public final transient DefaultInputPort<V> base = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key to store the value. If same key appears again update
+     * with latest value.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error
+                                        // tuple?
+        baseValue = tuple.doubleValue();
+      }
+    }
+  };
+
+  /**
+   * Output port that emits change in value compared to base value.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>();
+
+  /**
+   * Output port that emits percent change in data value compared to base value.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>();
+
+  /**
+   * baseValue is a state full field. It is retained across windows.
+   */
+  private double baseValue = 0;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlert.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlert.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlert.java
new file mode 100644
index 0000000..c554ead
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlert.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import javax.validation.constraints.Min;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.BaseNumberValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Compares consecutive input data values, emits &lt;value,percent change value&gt; pair on alert output port, if percent change exceeds certain thresh hold value.
+ * <p>
+ * Operator is StateFull since current value is stored for comparison in next window. <br>
+ * This operator can not be partitioned, partitioning will result in inconsistent base value
+ * across replicated copies.
+ * <br>
+ *
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
+ * <b>alert</b>: emits KeyValPair&lt;K,KeyValPair&lt;V,Double&gt;&gt;(1)<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>threshold</b>: The threshold of change between consecutive tuples of the
+ * same key that triggers an alert tuple<br>
+ * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
+ * <b>filterBy</b>: List of keys to filter on<br>
+ * <br>
+ * <b>Specific compile time checks</b>: None<br>
+ * <b>Specific run time checks</b>: None<br>
+ * <br>
+ * @displayName Change Alert
+ * @category Rules and Alerts
+ * @tags change, key value, numeric, percentage
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class ChangeAlert<V extends Number> extends BaseNumberValueOperator<V>
+{
+  /**
+   * Input port that takes in a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it. If we get 0 as
+     * tuple next will be skipped.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      double tval = tuple.doubleValue();
+      if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple?
+        baseValue = tval;
+        return;
+      }
+      double change = tval - baseValue;
+      double percent = (change / baseValue) * 100;
+      if (percent < 0.0) {
+        percent = 0.0 - percent;
+      }
+      if (percent > percentThreshold) {
+        KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple),
+            percent);
+        alert.emit(kv);
+      }
+      baseValue = tval;
+    }
+  };
+
+
+  /**
+   * Output port which emits a key value pair.
+   */
+  public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>();
+
+  /**
+   * baseValue is a state full field. It is retained across windows
+   */
+  private double baseValue = 0;
+  @Min(1)
+  private double percentThreshold = 0.0;
+
+  /**
+   * getter function for threshold value
+   *
+   * @return threshold value
+   */
+  @Min(1)
+  public double getPercentThreshold()
+  {
+    return percentThreshold;
+  }
+
+  /**
+   * setter function for threshold value
+   */
+  public void setPercentThreshold(double d)
+  {
+    percentThreshold = d;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyVal.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyVal.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyVal.java
new file mode 100644
index 0000000..8d75ab4
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyVal.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+
+import javax.validation.constraints.Min;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator compares consecutive values arriving at input port mapped by keys, emits &lt;key,percent change&gt; pair on output alert port if percent change exceeds percentage threshold set in operator.
+ * <p>
+ * StateFull : Yes, current key/value is stored in operator for comparison in
+ * next successive windows. <br>
+ * Partition(s): No, base comparison value will be inconsistent across
+ * instantiated copies. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
+ * <b>alert</b>: emits KeyValPair&lt;K,KeyValPair&lt;V,Double&gt;&gt;(1)<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>threshold</b>: The threshold of change between consecutive tuples of the
+ * same key that triggers an alert tuple<br>
+ * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
+ * <b>filterBy</b>: List of keys to filter on<br>
+ * @displayName Change Alert Key Value
+ * @category Rules and Alerts
+ * @tags change, key value, numeric, percentage
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class ChangeAlertKeyVal<K, V extends Number> extends
+    BaseNumberKeyValueOperator<K, V>
+{
+  /**
+   * Base map is a StateFull field. It is retained across windows
+   */
+  private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
+
+  /**
+   * Input data port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      double tval = tuple.getValue().doubleValue();
+      MutableDouble val = basemap.get(key);
+      if (!doprocessKey(key)) {
+        return;
+      }
+      if (val == null) { // Only process keys that are in the basemap
+        val = new MutableDouble(tval);
+        basemap.put(cloneKey(key), val);
+        return;
+      }
+      double change = tval - val.doubleValue();
+      double percent = (change / val.doubleValue()) * 100;
+      if (percent < 0.0) {
+        percent = 0.0 - percent;
+      }
+      if (percent > percentThreshold) {
+        KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
+            cloneValue(tuple.getValue()), percent);
+        KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>(
+            cloneKey(key), dmap);
+        alert.emit(otuple);
+      }
+      val.setValue(tval);
+    }
+  };
+
+  /**
+   * Key,Percent Change output port.
+   */
+  public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
+
+  /**
+   * Alert thresh hold percentage set by application.
+   */
+  @Min(1)
+  private double percentThreshold = 0.0;
+
+  /**
+   * getter function for threshold value
+   *
+   * @return threshold value
+   */
+  @Min(1)
+  public double getPercentThreshold()
+  {
+    return percentThreshold;
+  }
+
+  /**
+   * setter function for threshold value
+   */
+  public void setPercentThreshold(double d)
+  {
+    percentThreshold = d;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMap.java
new file mode 100644
index 0000000..e8add80
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMap.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+
+/**
+ * Operator stores  &lt;key,value&gt; pair in hash map across the windows for comparison and emits hash map of &lt;key,percent change in value for each key&gt; if percent change
+ * exceeds preset threshold.
+ * <p>
+ *
+ * StateFull : Yes, key/value pair in current window are stored for comparison in next window. <br>
+ * Partition : No, will yield wrong result, base value won't be consistent across instances. <br>
+ *
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>alert</b>: emits HashMap&lt;K,HashMap&lt;V,Double&gt;&gt;(1)<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>threshold</b>: The threshold of change between consecutive tuples of the same key that triggers an alert tuple<br>
+ * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
+ * <b>filterBy</b>: List of keys to filter on<br>
+ * @displayName Change Alert Map
+ * @category Rules and Alerts
+ * @tags change, key value, numeric, percentage, map
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+public class ChangeAlertMap<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
+{
+  /**
+   * Input data port that takes a map of &lt;key,value&gt;.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Process each key, compute change or percent, and emits it.
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e: tuple.entrySet()) {
+        MutableDouble val = basemap.get(e.getKey());
+        if (!doprocessKey(e.getKey())) {
+          continue;
+        }
+        if (val == null) { // Only process keys that are in the basemap
+          val = new MutableDouble(e.getValue().doubleValue());
+          basemap.put(cloneKey(e.getKey()), val);
+          continue;
+        }
+        double change = e.getValue().doubleValue() - val.doubleValue();
+        double percent = (change / val.doubleValue()) * 100;
+        if (percent < 0.0) {
+          percent = 0.0 - percent;
+        }
+        if (percent > percentThreshold) {
+          HashMap<V,Double> dmap = new HashMap<V,Double>(1);
+          dmap.put(cloneValue(e.getValue()), percent);
+          HashMap<K,HashMap<V,Double>> otuple = new HashMap<K,HashMap<V,Double>>(1);
+          otuple.put(cloneKey(e.getKey()), dmap);
+          alert.emit(otuple);
+        }
+        val.setValue(e.getValue().doubleValue());
+      }
+    }
+  };
+
+  // Default "pass through" unifier works as tuple is emitted as pass through
+  /**
+   * Output port which emits a hashmap of key, percentage change.
+   */
+  public final transient DefaultOutputPort<HashMap<K, HashMap<V,Double>>> alert = new DefaultOutputPort<HashMap<K, HashMap<V,Double>>>();
+
+  /**
+   * basemap is a statefull field. It is retained across windows
+   */
+  private HashMap<K,MutableDouble> basemap = new HashMap<K,MutableDouble>();
+  @Min(1)
+  private double percentThreshold = 0.0;
+
+  /**
+   * getter function for threshold value
+   * @return threshold value
+   */
+  @Min(1)
+  public double getPercentThreshold()
+  {
+    return percentThreshold;
+  }
+
+  /**
+   * setter function for threshold value
+   */
+  public void setPercentThreshold(double d)
+  {
+    percentThreshold = d;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyVal.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyVal.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyVal.java
new file mode 100644
index 0000000..0600e6a
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyVal.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator compares &lt;key,value&gt; pairs arriving at data and base input ports and stores &lt;key,value&gt; pairs arriving at base port in hash map across the windows.
+ * <p/>
+ * The &lt;key,value&gt; pairs that arrive at data port are compared with base value if the key exists in the hash map.&nbsp;
+ * Change value and percentage are emitted on separate ports.
+ * StateFull : Yes, base map values are stored across windows. <br>
+ * Partitions : Yes, values on the base port are replicated across all partitions. However the order of tuples on the
+ * output stream may change.
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
+ * <b>base</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
+ * <b>change</b>: emits KeyValPair&lt;K,V&gt;(1)<br>
+ * <b>percent</b>: emits KeyValPair&lt;K,Double&gt;(1)<br>
+ * <br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
+ * <b>filterBy</b>: List of keys to filter on<br>
+ *
+ * @displayName Change Key Value
+ * @category Math
+ * @tags change, key value
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class ChangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
+{
+  /**
+   * basemap is a stateful field. It is retained across windows
+   */
+  private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
+
+  /**
+   * Input data port that takes key value pairs.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      if (!doprocessKey(key)) {
+        return;
+      }
+      MutableDouble bval = basemap.get(key);
+      if (bval != null) { // Only process keys that are in the basemap
+        double cval = tuple.getValue().doubleValue() - bval.doubleValue();
+        change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval)));
+        percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100));
+      }
+    }
+  };
+
+  /**
+   * Base value input port, stored in base map for comparison.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> base = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Process each key to store the value. If same key appears again update
+     * with latest value.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      if (tuple.getValue().doubleValue() != 0.0) { // Avoid divide by zero, Emit
+        // an error tuple?
+        MutableDouble val = basemap.get(tuple.getKey());
+        if (val == null) {
+          val = new MutableDouble(0.0);
+          basemap.put(cloneKey(tuple.getKey()), val);
+        }
+        val.setValue(tuple.getValue().doubleValue());
+      }
+    }
+  };
+
+  /**
+   * Key, Change output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, V>> change = new DefaultOutputPort<KeyValPair<K, V>>();
+
+  /**
+   * Key, Percentage Change pair output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Double>> percent = new DefaultOutputPort<KeyValPair<K, Double>>();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java
new file mode 100644
index 0000000..155cb23
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.algo.MatchMap;
+import com.datatorrent.lib.util.UnifierHashMap;
+
+/**
+ * Operator compares based on the property "key", "value", and "compare".
+ * <p>
+ * The comparison is done by getting double value from the Number.
+ * Passed tuples are emitted on the output port "compare".&nbsp; 
+ * Failed tuples are emitted on port "except".
+ * Both output ports are optional, but at least one has to be connected.
+ * This module is a pass through<br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V&gt;<br>
+ * <b>compare</b>: emits HashMap&lt;K,V&gt;<br>
+ * <b>except</b>: emits HashMap&lt;K,V&gt;<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>key</b>: The key on which compare is done<br>
+ * <b>value</b>: The value to compare with<br>
+ * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
+ * <br>
+ * Compile time checks<br>
+ * Key must be non empty<br>
+ * Value must be able to convert to a "double"<br>
+ * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
+ * <b>Specific run time checks</b>:<br>
+ * Does the incoming HashMap have the key<br>
+ * Is the value of the key a number<br>
+ * <p>
+ * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareExceptMap&lt;K,V extends Number&gt; operator template">
+ * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
+ * <tr><td><b>5 Million K,V pairs/s</b></td><td>Each tuple is emitted if emitError is set to true</td><td>In-bound rate determines performance as every tuple is emitted.
+ * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr>
+ * </table><br>
+ * <p>
+ * <b>Function Table (K=String, V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>:
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareExceptMap&lt;K,V extends Number&gt; operator template">
+ * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th colspan=2>Out-bound (emit)</th></tr>
+ * <tr><th><i>data</i>(HashMap&lt;K,V&gt;)</th><th><i>compare</i>(HashMap&lt;K,V&gt;)</th><th><i>except</i>(HashMap&lt;K,V&gt;)</th></tr>
+ * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr>
+ * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td><td>{a=2,b=20,c=1000}</td></tr>
+ * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td><td>{a=10,b=5}</td></tr>
+ * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td><td>{d=55,b=12}</td></tr>
+ * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td><td>{d=22,a=4}</td></tr>
+ * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td><td></td></tr>
+ * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr>
+ * </table>
+ * <br>
+ * <br>
+ * @displayName Compare Except Map
+ * @category Math
+ * @tags comparison, key value, number, hash map
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@Stateless
+public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V>
+{
+  /**
+   * Output port that emits a hashmap of matched tuples after comparison.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
+  
+  /**
+   * Output port that emits a hashmap of non matching tuples after comparison.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMap<K, V>();
+    }
+  };
+
+  /**
+   * Emits if compare port is connected
+   * @param tuple
+   */
+  @Override
+  public void tupleMatched(Map<K, V> tuple)
+  {
+    if (compare.isConnected()) {
+      compare.emit(cloneTuple(tuple));
+    }
+  }
+
+  /**
+   * Emits if except port is connected
+   * @param tuple
+   */
+  @Override
+  public void tupleNotMatched(Map<K, V> tuple)
+  {
+    if (except.isConnected()) {
+      except.emit(cloneTuple(tuple));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareMap.java
new file mode 100644
index 0000000..e263d3f
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareMap.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.algo.MatchMap;
+
+/**
+ * This operator compares tuples subclassed from Number based on the property "key", "value", and "cmp", and matching tuples are emitted.
+ * <p>
+ * If the tuple passed the test, it is emitted on the output port "compare".&nbsp; The comparison is done by getting double value from the Number.
+ * Both output ports are optional, but at least one has to be connected.
+ * This module is a pass through<br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>compare</b>: emits HashMap&lt;K,V&gt;<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>key</b>: The key on which compare is done<br>
+ * <b>value</b>: The value to compare with<br>
+ * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
+ * <br>
+ * <b>Compile time checks</b>:<br>
+ * Key must be non empty<br>
+ * Value must be able to convert to a "double"<br>
+ * CompareMap string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
+ * <br>
+ * <b>Specific run time checks</b>:<br>
+ * Does the incoming HashMap have the key<br>
+ * Is the value of the key a number<br>
+ * <p>
+ * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareMap&lt;K,V extends Number&gt; operator template">
+ * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
+ * <tr><td><b>8 Million K,V pairs/s</b></td><td>Each matched tuple is emitted</td><td>In-bound rate and number of tuples that match determine performance.
+ * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr>
+ * </table><br>
+ * <p>
+ * <b>Function Table (K=String,V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>:
+ * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareMap&lt;K,V extends Number&gt; operator template">
+ * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr>
+ * <tr><th><i>data</i>(Map&lt;K,V&gt;)</th><th><i>compare</i>(HashMap&lt;K,V&gt;)</th></tr>
+ * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr>
+ * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td></tr>
+ * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td></tr>
+ * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td></tr>
+ * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td></tr>
+ * </table>
+ * <br>
+ * <br>
+ * @displayName Compare Map
+ * @category Math
+ * @tags comparison, key value, numeric, map
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@Stateless
+public class CompareMap<K, V extends Number> extends MatchMap<K,V>
+{
+  /**
+   * Output port that emits a hashmap of matching number tuples after comparison.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CountKeyVal.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CountKeyVal.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CountKeyVal.java
new file mode 100644
index 0000000..a229796
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CountKeyVal.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableInt;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.BaseKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.UnifierCountOccurKey;
+
+/**
+ * This Operator aggregates occurrence of keys in &lt;key,value&gt; pair at input port.&lt;Key,Occurrence count&gt; pair is emitted for each input on output port.
+ * <p>
+ * <br>
+ * StateFull : Yes, key occurrence is aggregated over windows. <br>
+ * Partitions : Yes, count occurrence unifier at output port. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects KeyValPair&lt;K,V&gt;<br>
+ * <b>count</b>: emits KeyValPair&lt;K,Integer&gt;</b><br>
+ * <br>
+ * @displayName Count Key Value
+ * @category Math
+ * @tags count, key value, aggregate
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V>
+{
+
+  /**
+   * Key occurrence count map.
+   */
+  protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
+
+  /**
+   * Input data port that takes key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * For each tuple (a key value pair): Adds the values for each key, Counts
+     * the number of occurrence of each key
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      MutableInt count = counts.get(key);
+      if (count == null) {
+        count = new MutableInt(0);
+        counts.put(cloneKey(key), count);
+      }
+      count.increment();
+    }
+
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
+
+  /**
+   * Key, occurrence value pair output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
+  {
+    @Override
+    public UnifierCountOccurKey<K> getUnifier()
+    {
+      return new UnifierCountOccurKey<K>();
+    }
+  };
+
+  /**
+   * Emits on all ports that are connected. Data is computed during process on
+   * input port and endWindow just emits it for each key. Clears the internal
+   * data if resetAtEndWindow is true.
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
+      count.emit(new KeyValPair(e.getKey(),
+          new Integer(e.getValue().intValue())));
+    }
+    counts.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java
new file mode 100644
index 0000000..3dcae74
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.algo.MatchMap;
+import com.datatorrent.lib.util.UnifierHashMap;
+
+/**
+ * This operator does comparison on tuple sub-classed from Number based on the property "key", "value", and "cmp", and not matched tuples are emitted.
+ * <p>
+ * The comparison is done by getting double value from the Number. Both output ports
+ * are optional, but at least one has to be connected
+ * <p>
+ * This module is a pass through<br>
+ * <br>
+ * <br>
+ * StateFull : No, output is emitted in current window. <br>
+ * Partitions : Yes, No state dependency among input tuples. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>except</b>: emits HashMap&lt;K,V&gt;<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>key</b>: The key on which compare is done<br>
+ * <b>value</b>: The value to compare with<br>
+ * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq",
+ * "neq", "gt", "gte". Default is "eq"<br>
+ * <br>
+ * <b>Compile time checks</b>:<br>
+ * Key must be non empty<br>
+ * Value must be able to convert to a "double"<br>
+ * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt",
+ * "gte"<br>
+ * <br>
+ * <b>Run time checks</b>:<br>
+ * Does the incoming HashMap have the key, Is the value of the key a number<br>
+ * <br>
+ * @displayName Except Map
+ * @category Math
+ * @tags comparison, Number
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+@Stateless
+public class ExceptMap<K, V extends Number> extends MatchMap<K, V>
+{       
+        /**
+         * Output port that emits non matching number tuples.
+         */
+  public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMap<K, V>();
+    }
+  };
+
+  /**
+   * Does nothing. Overrides base as call super.tupleMatched() would emit the
+   * tuple
+   *
+   * @param tuple
+   */
+  @Override
+  public void tupleMatched(Map<K, V> tuple)
+  {
+  }
+
+  /**
+   * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override
+   * in case objects are mutable
+   *
+   * @param tuple
+   */
+  @Override
+  public void tupleNotMatched(Map<K, V> tuple)
+  {
+    except.emit(cloneTuple(tuple));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java
new file mode 100644
index 0000000..e1deb9d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.util.BaseNumberValueOperator;
+
+/**
+ * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window. 
+ * <p>
+ * <br>
+ * <b>StateFull : Yes </b>, Sum of values is taken over application window. <br>
+ * <b>Partitions : No </b>, will yield wrong results, since values are
+ * accumulated over application window. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>numerator</b>: expects V extends Number<br>
+ * <b>denominator</b>: expects V extends Number<br>
+ * <b>quotient</b>: emits Double<br>
+ * <br>
+ * <b>Properties : </b> <br>
+ * <b>mult_by : </b>Multiply by value(default = 1). <br>
+ * <br>
+ * @displayName Quotient
+ * @category Math
+ * @tags division, sum, numeric
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class Quotient<V extends Number> extends BaseNumberValueOperator<V>
+{
+  protected double nval = 0.0;
+  protected double dval = 0.0;
+  int mult_by = 1;
+
+  /**
+   * Numerator values input port.
+   */
+  public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the numerator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      nval += tuple.doubleValue();
+    }
+  };
+
+  /**
+   * Denominator values input port.
+   */
+  public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the denominator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      dval += tuple.doubleValue();
+    }
+  };
+
+  /**
+   * Quotient output port.
+   */
+  public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>();
+
+  public void setMult_by(int i)
+  {
+    mult_by = i;
+  }
+
+  /**
+   * Generates tuple emits it as long as denominator is not 0. Clears internal
+   * data
+   */
+  @Override
+  public void endWindow()
+  {
+    if (dval == 0) {
+      return;
+    }
+    double val = (nval / dval) * mult_by;
+    quotient.emit(getValue(val));
+    nval = 0.0;
+    dval = 0.0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java
new file mode 100644
index 0000000..3581b81
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+
+/**
+ * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator. 
+ * <p>
+ * <br>
+ * Application can set multiplication value for quotient(default = 1). <br>
+ * Operator will calculate quotient of occurrence of key in numerator divided by
+ * occurrence of key in denominator if countKey flag is true. <br>
+ * Application can allow or block keys by setting filter key and inverse flag. <br>
+ * <br>
+ * <b>StateFull : Yes</b>, numerator/denominator values are summed over
+ * application window. <br>
+ * <b>Partitions : No, </b>, will yield wrong results, since values are summed
+ * over app window. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>numerator</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>denominator</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>quotient</b>: emits HashMap&lt;K,Double&gt;<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>inverse :</b> if set to true the key in the filter will block tuple<br>
+ * <b>filterBy :</b> List of keys to filter on<br>
+ * <b>countkey :</b> Get quotient of occurrence of keys in numerator and
+ * denominator. <br>
+ * <b>mult_by :</b> Set multiply by constant value. <br>
+ * <br>
+ * @displayName Quotient Map
+ * @category Math
+ * @tags division, sum, map
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class QuotientMap<K, V extends Number> extends
+    BaseNumberKeyValueOperator<K, V>
+{
+  /**
+   * Numerator key/sum value map.
+   */
+  protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
+
+  /**
+   * Denominator key/sum value map.
+   */
+  protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
+
+  /**
+   * Count occurrence of keys if set to true.
+   */
+  boolean countkey = false;
+
+  /**
+   * Quotient multiply by value.
+   */
+  int mult_by = 1;
+
+  /**
+   * Numerator input port.
+   */
+  public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Added tuple to the numerator hash
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      addTuple(tuple, numerators);
+    }
+  };
+
+  /**
+   * Denominator input port.
+   */
+  public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Added tuple to the denominator hash
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      addTuple(tuple, denominators);
+    }
+  };
+
+  /**
+   * Quotient output port.
+   */
+  public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>();
+
+  /**
+   * Add tuple to nval/dval map.
+   *
+   * @param tuple
+   *          key/value map on input port.
+   * @param map
+   *          key/summed value map.
+   */
+  public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
+  {
+    for (Map.Entry<K, V> e : tuple.entrySet()) {
+      addEntry(e.getKey(), e.getValue(), map);
+    }
+  }
+
+  /**
+   * Add/Update entry to key/sum value map.
+   *
+   * @param key
+   *          name.
+   * @param value
+   *          value for key.
+   * @param map
+   *          numerator/denominator key/sum map.
+   */
+  public void addEntry(K key, V value, Map<K, MutableDouble> map)
+  {
+    if (!doprocessKey(key) || (value == null)) {
+      return;
+    }
+    MutableDouble val = map.get(key);
+    if (val == null) {
+      if (countkey) {
+        val = new MutableDouble(1.00);
+      } else {
+        val = new MutableDouble(value.doubleValue());
+      }
+    } else {
+      if (countkey) {
+        val.increment();
+      } else {
+        val.add(value.doubleValue());
+      }
+    }
+    map.put(cloneKey(key), val);
+  }
+
+  /**
+   * getter for mult_by
+   *
+   * @return mult_by
+   */
+
+  @Min(0)
+  public int getMult_by()
+  {
+    return mult_by;
+  }
+
+  /**
+   * getter for countkey
+   *
+   * @return countkey
+   */
+  public boolean getCountkey()
+  {
+    return countkey;
+  }
+
+  /**
+   * Setter for mult_by
+   *
+   * @param i
+   */
+  public void setMult_by(int i)
+  {
+    mult_by = i;
+  }
+
+  /**
+   * setter for countkey
+   *
+   * @param i
+   *          sets countkey
+   */
+  public void setCountkey(boolean i)
+  {
+    countkey = i;
+  }
+
+  /**
+   * Generates tuples for each key and emits them. Only keys that are in the
+   * denominator are iterated on If the key is only in the numerator, it gets
+   * ignored (cannot do divide by 0) Clears internal data
+   */
+  @Override
+  public void endWindow()
+  {
+    HashMap<K, Double> tuples = new HashMap<K, Double>();
+    for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
+      MutableDouble nval = numerators.get(e.getKey());
+      if (nval == null) {
+        tuples.put(e.getKey(), new Double(0.0));
+      } else {
+        tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
+            .doubleValue()) * mult_by));
+      }
+    }
+    if (!tuples.isEmpty()) {
+      quotient.emit(tuples);
+    }
+    numerators.clear();
+    denominators.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java
new file mode 100644
index 0000000..048eff7
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableInt;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.UnifierHashMapInteger;
+import com.datatorrent.lib.util.UnifierHashMapSumKeys;
+
+/**
+ * Emits the sum and count of values for each key at the end of window.
+ * <p>
+ * Application accumulate sum across streaming window by setting cumulative flag
+ * to true. <br>
+ * This is an end of window operator<br>
+ * <br>
+ * <b>StateFull : Yes</b>, Sum is computed over application window and streaming
+ * window. <br>
+ * <b>Partitions : Yes</b>, Sum is unified at output port. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>sum</b>: emits HashMap&lt;K,V&gt;<br>
+ * <b>count</b>: emits HashMap&lt;K,Integer&gt;</b><br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
+ * <b>filterBy</b>: List of keys to filter on<br>
+ * <b>cumulative</b>: boolean flag, if set the sum is not cleared at the end of
+ * window, <br>
+ * hence generating cumulative sum across streaming windows. Default is false.<br>
+ * <br>
+ * @displayName Sum Count Map
+ * @category Math
+ * @tags  number, sum, counting, map
+ * @since 0.3.3
+ * @deprecated
+ */
+@Deprecated
+public class SumCountMap<K, V extends Number> extends
+    BaseNumberKeyValueOperator<K, V>
+{
+  /**
+   * Key/double sum map.
+   */
+  protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+
+  /**
+   * Key/integer sum map.
+   */
+  protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
+
+  /**
+   * Cumulative sum flag.
+   */
+  protected boolean cumulative = false;
+
+  /**
+   * Input port that takes a map.&nbsp; It adds the values for each key and counts the number of occurrences for each key.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * For each tuple (a HashMap of keys,val pairs) Adds the values for each
+     * key, Counts the number of occurrences of each key
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e : tuple.entrySet()) {
+        K key = e.getKey();
+        if (!doprocessKey(key)) {
+          continue;
+        }
+        if (sum.isConnected()) {
+          MutableDouble val = sums.get(key);
+          if (val == null) {
+            val = new MutableDouble(e.getValue().doubleValue());
+          } else {
+            val.add(e.getValue().doubleValue());
+          }
+          sums.put(cloneKey(key), val);
+        }
+        if (SumCountMap.this.count.isConnected()) {
+          MutableInt count = counts.get(key);
+          if (count == null) {
+            count = new MutableInt(0);
+            counts.put(cloneKey(key), count);
+          }
+          count.increment();
+        }
+      }
+    }
+  };
+
+  /**
+   * Key,sum map output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMapSumKeys<K, V>();
+    }
+  };
+
+  /**
+   * Key,double sum map output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Double>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>();
+      ret.setType(Double.class);
+      return ret;
+    }
+  };
+
+  /**
+   * Key,integer sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Integer>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>();
+      ret.setType(Integer.class);
+      return ret;
+    }
+  };
+
+
+        /**
+   * Key,long sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Long>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>();
+      ret.setType(Long.class);
+      return ret;
+    }
+  };
+        
+        /**
+   * Key,short sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Short>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>();
+      ret.setType(Short.class);
+      return ret;
+    }
+  };
+        
+        /**
+   * Key,float sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Float>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>();
+      ret.setType(Float.class);
+      return ret;
+    }
+  };
+        
+        /**
+   * Key,integer sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>()
+  {
+    @Override
+    public Unifier<HashMap<K, Integer>> getUnifier()
+    {
+      return new UnifierHashMapInteger<K>();
+    }
+  };
+
+  /**
+   * Get cumulative flag.
+   *
+   * @return cumulative flag
+   */
+  public boolean isCumulative()
+  {
+    return cumulative;
+  }
+
+  /**
+   * set cumulative flag.
+   *
+   * @param cumulative
+   *          input flag
+   */
+  public void setCumulative(boolean cumulative)
+  {
+    this.cumulative = cumulative;
+  }
+
+  /**
+   * Emits on all ports that are connected. Data is precomputed during process
+   * on input port endWindow just emits it for each key Clears the internal data
+   * before return
+   */
+  @Override
+  public void endWindow()
+  {
+
+    // Should allow users to send each key as a separate tuple to load balance
+    // This is an aggregate node, so load balancing would most likely not be
+    // needed
+
+    HashMap<K, V> tuples = new HashMap<K, V>();
+    HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
+    HashMap<K, Double> dtuples = new HashMap<K, Double>();
+    HashMap<K, Integer> ituples = new HashMap<K, Integer>();
+    HashMap<K, Float> ftuples = new HashMap<K, Float>();
+    HashMap<K, Long> ltuples = new HashMap<K, Long>();
+    HashMap<K, Short> stuples = new HashMap<K, Short>();
+
+    for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
+      K key = e.getKey();
+      MutableDouble val = e.getValue();
+      tuples.put(key, getValue(val.doubleValue()));
+      dtuples.put(key, val.doubleValue());
+      ituples.put(key, val.intValue());
+      ftuples.put(key, val.floatValue());
+      ltuples.put(key, val.longValue());
+      stuples.put(key, val.shortValue());
+      // ctuples.put(key, counts.get(e.getKey()).toInteger());
+      MutableInt c = counts.get(e.getKey());
+      if (c != null) {
+        ctuples.put(key, c.toInteger());
+      }
+    }
+
+    sum.emit(tuples);
+    sumDouble.emit(dtuples);
+    sumInteger.emit(ituples);
+    sumLong.emit(ltuples);
+    sumShort.emit(stuples);
+    sumFloat.emit(ftuples);
+    count.emit(ctuples);
+    clearCache();
+  }
+
+  /**
+   * Clear sum maps.
+   */
+  private void clearCache()
+  {
+    if (!cumulative) {
+      sums.clear();
+      counts.clear();
+    }
+  }
+}