You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:49 UTC

[05/12] apex-malhar git commit: Updated algo & working on math operators

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
deleted file mode 100644
index b0d2e77..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import javax.validation.constraints.Min;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Operator compares consecutive values arriving at input port mapped by keys, emits <key,percent change> pair on output alert port if percent change exceeds percentage threshold set in operator.
- * <p>
- * StateFull : Yes, current key/value is stored in operator for comparison in
- * next successive windows. <br>
- * Partition(s): No, base comparison value will be inconsistent across
- * instantiated copies. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
- * <b>alert</b>: emits KeyValPair&lt;K,KeyValPair&lt;V,Double&gt;&gt;(1)<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>threshold</b>: The threshold of change between consecutive tuples of the
- * same key that triggers an alert tuple<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * @displayName Change Alert Key Value
- * @category Rules and Alerts
- * @tags change, key value, numeric, percentage
- * @since 0.3.3
- */
-public class ChangeAlertKeyVal<K, V extends Number> extends
-    BaseNumberKeyValueOperator<K, V>
-{
-  /**
-   * Base map is a StateFull field. It is retained across windows
-   */
-  private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
-
-  /**
-   * Input data port that takes a key value pair.
-   */
-  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
-  {
-    /**
-     * Process each key, compute change or percent, and emit it.
-     */
-    @Override
-    public void process(KeyValPair<K, V> tuple)
-    {
-      K key = tuple.getKey();
-      double tval = tuple.getValue().doubleValue();
-      MutableDouble val = basemap.get(key);
-      if (!doprocessKey(key)) {
-        return;
-      }
-      if (val == null) { // Only process keys that are in the basemap
-        val = new MutableDouble(tval);
-        basemap.put(cloneKey(key), val);
-        return;
-      }
-      double change = tval - val.doubleValue();
-      double percent = (change / val.doubleValue()) * 100;
-      if (percent < 0.0) {
-        percent = 0.0 - percent;
-      }
-      if (percent > percentThreshold) {
-        KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
-            cloneValue(tuple.getValue()), percent);
-        KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>(
-            cloneKey(key), dmap);
-        alert.emit(otuple);
-      }
-      val.setValue(tval);
-    }
-  };
-
-  /**
-   * Key,Percent Change output port.
-   */
-  public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
-
-  /**
-   * Alert thresh hold percentage set by application.
-   */
-  @Min(1)
-  private double percentThreshold = 0.0;
-
-  /**
-   * getter function for threshold value
-   *
-   * @return threshold value
-   */
-  @Min(1)
-  public double getPercentThreshold()
-  {
-    return percentThreshold;
-  }
-
-  /**
-   * setter function for threshold value
-   */
-  public void setPercentThreshold(double d)
-  {
-    percentThreshold = d;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
deleted file mode 100644
index e212a2d..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.Min;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-
-/**
- * Operator stores  &lt;key,value&gt; pair in hash map across the windows for comparison and emits hash map of &lt;key,percent change in value for each key&gt; if percent change
- * exceeds preset threshold.
- * <p>
- *
- * StateFull : Yes, key/value pair in current window are stored for comparison in next window. <br>
- * Partition : No, will yield wrong result, base value won't be consistent across instances. <br>
- *
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>alert</b>: emits HashMap&lt;K,HashMap&lt;V,Double&gt;&gt;(1)<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>threshold</b>: The threshold of change between consecutive tuples of the same key that triggers an alert tuple<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * @displayName Change Alert Map
- * @category Rules and Alerts
- * @tags change, key value, numeric, percentage, map
- * @since 0.3.2
- */
-public class ChangeAlertMap<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
-{
-  /**
-   * Input data port that takes a map of &lt;key,value&gt;.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Process each key, compute change or percent, and emits it.
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      for (Map.Entry<K, V> e: tuple.entrySet()) {
-        MutableDouble val = basemap.get(e.getKey());
-        if (!doprocessKey(e.getKey())) {
-          continue;
-        }
-        if (val == null) { // Only process keys that are in the basemap
-          val = new MutableDouble(e.getValue().doubleValue());
-          basemap.put(cloneKey(e.getKey()), val);
-          continue;
-        }
-        double change = e.getValue().doubleValue() - val.doubleValue();
-        double percent = (change / val.doubleValue()) * 100;
-        if (percent < 0.0) {
-          percent = 0.0 - percent;
-        }
-        if (percent > percentThreshold) {
-          HashMap<V,Double> dmap = new HashMap<V,Double>(1);
-          dmap.put(cloneValue(e.getValue()), percent);
-          HashMap<K,HashMap<V,Double>> otuple = new HashMap<K,HashMap<V,Double>>(1);
-          otuple.put(cloneKey(e.getKey()), dmap);
-          alert.emit(otuple);
-        }
-        val.setValue(e.getValue().doubleValue());
-      }
-    }
-  };
-
-  // Default "pass through" unifier works as tuple is emitted as pass through
-  /**
-   * Output port which emits a hashmap of key, percentage change.
-   */
-  public final transient DefaultOutputPort<HashMap<K, HashMap<V,Double>>> alert = new DefaultOutputPort<HashMap<K, HashMap<V,Double>>>();
-
-  /**
-   * basemap is a statefull field. It is retained across windows
-   */
-  private HashMap<K,MutableDouble> basemap = new HashMap<K,MutableDouble>();
-  @Min(1)
-  private double percentThreshold = 0.0;
-
-  /**
-   * getter function for threshold value
-   * @return threshold value
-   */
-  @Min(1)
-  public double getPercentThreshold()
-  {
-    return percentThreshold;
-  }
-
-  /**
-   * setter function for threshold value
-   */
-  public void setPercentThreshold(double d)
-  {
-    percentThreshold = d;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
deleted file mode 100644
index 3f77052..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Operator compares &lt;key,value&gt; pairs arriving at data and base input ports and stores &lt;key,value&gt; pairs arriving at base port in hash map across the windows.
- * <p/>
- * The &lt;key,value&gt; pairs that arrive at data port are compared with base value if the key exists in the hash map.&nbsp;
- * Change value and percentage are emitted on separate ports.
- * StateFull : Yes, base map values are stored across windows. <br>
- * Partitions : Yes, values on the base port are replicated across all partitions. However the order of tuples on the
- * output stream may change.
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
- * <b>base</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
- * <b>change</b>: emits KeyValPair&lt;K,V&gt;(1)<br>
- * <b>percent</b>: emits KeyValPair&lt;K,Double&gt;(1)<br>
- * <br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- *
- * @displayName Change Key Value
- * @category Math
- * @tags change, key value
- * @since 0.3.3
- */
-public class ChangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
-{
-  /**
-   * basemap is a stateful field. It is retained across windows
-   */
-  private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
-
-  /**
-   * Input data port that takes key value pairs.
-   */
-  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
-  {
-    /**
-     * Process each key, compute change or percent, and emit it.
-     */
-    @Override
-    public void process(KeyValPair<K, V> tuple)
-    {
-      K key = tuple.getKey();
-      if (!doprocessKey(key)) {
-        return;
-      }
-      MutableDouble bval = basemap.get(key);
-      if (bval != null) { // Only process keys that are in the basemap
-        double cval = tuple.getValue().doubleValue() - bval.doubleValue();
-        change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval)));
-        percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100));
-      }
-    }
-  };
-
-  /**
-   * Base value input port, stored in base map for comparison.
-   */
-  public final transient DefaultInputPort<KeyValPair<K, V>> base = new DefaultInputPort<KeyValPair<K, V>>()
-  {
-    /**
-     * Process each key to store the value. If same key appears again update
-     * with latest value.
-     */
-    @Override
-    public void process(KeyValPair<K, V> tuple)
-    {
-      if (tuple.getValue().doubleValue() != 0.0) { // Avoid divide by zero, Emit
-        // an error tuple?
-        MutableDouble val = basemap.get(tuple.getKey());
-        if (val == null) {
-          val = new MutableDouble(0.0);
-          basemap.put(cloneKey(tuple.getKey()), val);
-        }
-        val.setValue(tuple.getValue().doubleValue());
-      }
-    }
-  };
-
-  /**
-   * Key, Change output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<K, V>> change = new DefaultOutputPort<KeyValPair<K, V>>();
-
-  /**
-   * Key, Percentage Change pair output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<K, Double>> percent = new DefaultOutputPort<KeyValPair<K, Double>>();
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
deleted file mode 100644
index 66bd7da..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.algo.MatchMap;
-import com.datatorrent.lib.util.UnifierHashMap;
-
-/**
- * Operator compares based on the property "key", "value", and "compare".
- * <p>
- * The comparison is done by getting double value from the Number.
- * Passed tuples are emitted on the output port "compare".&nbsp; 
- * Failed tuples are emitted on port "except".
- * Both output ports are optional, but at least one has to be connected.
- * This module is a pass through<br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V&gt;<br>
- * <b>compare</b>: emits HashMap&lt;K,V&gt;<br>
- * <b>except</b>: emits HashMap&lt;K,V&gt;<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * Compile time checks<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
- * <b>Specific run time checks</b>:<br>
- * Does the incoming HashMap have the key<br>
- * Is the value of the key a number<br>
- * <p>
- * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
- * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareExceptMap&lt;K,V extends Number&gt; operator template">
- * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
- * <tr><td><b>5 Million K,V pairs/s</b></td><td>Each tuple is emitted if emitError is set to true</td><td>In-bound rate determines performance as every tuple is emitted.
- * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr>
- * </table><br>
- * <p>
- * <b>Function Table (K=String, V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>:
- * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareExceptMap&lt;K,V extends Number&gt; operator template">
- * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th colspan=2>Out-bound (emit)</th></tr>
- * <tr><th><i>data</i>(HashMap&lt;K,V&gt;)</th><th><i>compare</i>(HashMap&lt;K,V&gt;)</th><th><i>except</i>(HashMap&lt;K,V&gt;)</th></tr>
- * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr>
- * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td><td>{a=2,b=20,c=1000}</td></tr>
- * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td><td>{a=10,b=5}</td></tr>
- * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td><td>{d=55,b=12}</td></tr>
- * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td><td>{d=22,a=4}</td></tr>
- * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td><td></td></tr>
- * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr>
- * </table>
- * <br>
- * <br>
- * @displayName Compare Except Map
- * @category Math
- * @tags comparison, key value, number, hash map
- * @since 0.3.2
- */
-@Stateless
-public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V>
-{
-  /**
-   * Output port that emits a hashmap of matched tuples after comparison.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
-  
-  /**
-   * Output port that emits a hashmap of non matching tuples after comparison.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
-  {
-    @Override
-    public Unifier<HashMap<K, V>> getUnifier()
-    {
-      return new UnifierHashMap<K, V>();
-    }
-  };
-
-  /**
-   * Emits if compare port is connected
-   * @param tuple
-   */
-  @Override
-  public void tupleMatched(Map<K, V> tuple)
-  {
-    if (compare.isConnected()) {
-      compare.emit(cloneTuple(tuple));
-    }
-  }
-
-  /**
-   * Emits if except port is connected
-   * @param tuple
-   */
-  @Override
-  public void tupleNotMatched(Map<K, V> tuple)
-  {
-    if (except.isConnected()) {
-      except.emit(cloneTuple(tuple));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
deleted file mode 100644
index 3636207..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.algo.MatchMap;
-
-/**
- * This operator compares tuples subclassed from Number based on the property "key", "value", and "cmp", and matching tuples are emitted.
- * <p>
- * If the tuple passed the test, it is emitted on the output port "compare".&nbsp; The comparison is done by getting double value from the Number.
- * Both output ports are optional, but at least one has to be connected.
- * This module is a pass through<br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>compare</b>: emits HashMap&lt;K,V&gt;<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * CompareMap string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
- * <br>
- * <b>Specific run time checks</b>:<br>
- * Does the incoming HashMap have the key<br>
- * Is the value of the key a number<br>
- * <p>
- * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
- * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareMap&lt;K,V extends Number&gt; operator template">
- * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
- * <tr><td><b>8 Million K,V pairs/s</b></td><td>Each matched tuple is emitted</td><td>In-bound rate and number of tuples that match determine performance.
- * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr>
- * </table><br>
- * <p>
- * <b>Function Table (K=String,V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>:
- * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareMap&lt;K,V extends Number&gt; operator template">
- * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr>
- * <tr><th><i>data</i>(Map&lt;K,V&gt;)</th><th><i>compare</i>(HashMap&lt;K,V&gt;)</th></tr>
- * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr>
- * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td></tr>
- * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td></tr>
- * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td></tr>
- * </table>
- * <br>
- * <br>
- * @displayName Compare Map
- * @category Math
- * @tags comparison, key value, numeric, map
- * @since 0.3.2
- */
-@Stateless
-public class CompareMap<K, V extends Number> extends MatchMap<K,V>
-{
-  /**
-   * Output port that emits a hashmap of matching number tuples after comparison.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
deleted file mode 100644
index d593020..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableInt;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.BaseKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.lib.util.UnifierCountOccurKey;
-
-/**
- * This Operator aggregates occurrence of keys in &lt;key,value&gt; pair at input port.&lt;Key,Occurrence count&gt; pair is emitted for each input on output port.
- * <p>
- * <br>
- * StateFull : Yes, key occurrence is aggregated over windows. <br>
- * Partitions : Yes, count occurrence unifier at output port. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects KeyValPair&lt;K,V&gt;<br>
- * <b>count</b>: emits KeyValPair&lt;K,Integer&gt;</b><br>
- * <br>
- * @displayName Count Key Value
- * @category Math
- * @tags count, key value, aggregate
- * @since 0.3.3
- */
-public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V>
-{
-
-  /**
-   * Key occurrence count map.
-   */
-  protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
-
-  /**
-   * Input data port that takes key value pair.
-   */
-  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
-  {
-    /**
-     * For each tuple (a key value pair): Adds the values for each key, Counts
-     * the number of occurrence of each key
-     */
-    @Override
-    public void process(KeyValPair<K, V> tuple)
-    {
-      K key = tuple.getKey();
-      MutableInt count = counts.get(key);
-      if (count == null) {
-        count = new MutableInt(0);
-        counts.put(cloneKey(key), count);
-      }
-      count.increment();
-    }
-
-    @Override
-    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-    {
-      return getKeyValPairStreamCodec();
-    }
-  };
-
-  /**
-   * Key, occurrence value pair output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
-  {
-    @Override
-    public UnifierCountOccurKey<K> getUnifier()
-    {
-      return new UnifierCountOccurKey<K>();
-    }
-  };
-
-  /**
-   * Emits on all ports that are connected. Data is computed during process on
-   * input port and endWindow just emits it for each key. Clears the internal
-   * data if resetAtEndWindow is true.
-   */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  @Override
-  public void endWindow()
-  {
-    for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
-      count.emit(new KeyValPair(e.getKey(),
-          new Integer(e.getValue().intValue())));
-    }
-    counts.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
deleted file mode 100644
index ddef880..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.algo.MatchMap;
-import com.datatorrent.lib.util.UnifierHashMap;
-
-/**
- * This operator does comparison on tuple sub-classed from Number based on the property "key", "value", and "cmp", and not matched tuples are emitted.
- * <p>
- * The comparison is done by getting double value from the Number. Both output ports
- * are optional, but at least one has to be connected
- * <p>
- * This module is a pass through<br>
- * <br>
- * <br>
- * StateFull : No, output is emitted in current window. <br>
- * Partitions : Yes, No state dependency among input tuples. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>except</b>: emits HashMap&lt;K,V&gt;<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq",
- * "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt",
- * "gte"<br>
- * <br>
- * <b>Run time checks</b>:<br>
- * Does the incoming HashMap have the key, Is the value of the key a number<br>
- * <br>
- * @displayName Except Map
- * @category Math
- * @tags comparison, Number
- * @since 0.3.3
- */
-@Stateless
-public class ExceptMap<K, V extends Number> extends MatchMap<K, V>
-{       
-        /**
-         * Output port that emits non matching number tuples.
-         */
-  public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
-  {
-    @Override
-    public Unifier<HashMap<K, V>> getUnifier()
-    {
-      return new UnifierHashMap<K, V>();
-    }
-  };
-
-  /**
-   * Does nothing. Overrides base as call super.tupleMatched() would emit the
-   * tuple
-   *
-   * @param tuple
-   */
-  @Override
-  public void tupleMatched(Map<K, V> tuple)
-  {
-  }
-
-  /**
-   * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override
-   * in case objects are mutable
-   *
-   * @param tuple
-   */
-  @Override
-  public void tupleNotMatched(Map<K, V> tuple)
-  {
-    except.emit(cloneTuple(tuple));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/Quotient.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Quotient.java b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
deleted file mode 100644
index ed08e86..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/Quotient.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.lib.util.BaseNumberValueOperator;
-
-/**
- * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window. 
- * <p>
- * <br>
- * <b>StateFull : Yes </b>, Sum of values is taken over application window. <br>
- * <b>Partitions : No </b>, will yield wrong results, since values are
- * accumulated over application window. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>numerator</b>: expects V extends Number<br>
- * <b>denominator</b>: expects V extends Number<br>
- * <b>quotient</b>: emits Double<br>
- * <br>
- * <b>Properties : </b> <br>
- * <b>mult_by : </b>Multiply by value(default = 1). <br>
- * <br>
- * @displayName Quotient
- * @category Math
- * @tags division, sum, numeric
- * @since 0.3.3
- */
-@OperatorAnnotation(partitionable = false)
-public class Quotient<V extends Number> extends BaseNumberValueOperator<V>
-{
-  protected double nval = 0.0;
-  protected double dval = 0.0;
-  int mult_by = 1;
-
-  /**
-   * Numerator values input port.
-   */
-  public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
-  {
-    /**
-     * Adds to the numerator value
-     */
-    @Override
-    public void process(V tuple)
-    {
-      nval += tuple.doubleValue();
-    }
-  };
-
-  /**
-   * Denominator values input port.
-   */
-  public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
-  {
-    /**
-     * Adds to the denominator value
-     */
-    @Override
-    public void process(V tuple)
-    {
-      dval += tuple.doubleValue();
-    }
-  };
-
-  /**
-   * Quotient output port.
-   */
-  public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>();
-
-  public void setMult_by(int i)
-  {
-    mult_by = i;
-  }
-
-  /**
-   * Generates tuple emits it as long as denominator is not 0. Clears internal
-   * data
-   */
-  @Override
-  public void endWindow()
-  {
-    if (dval == 0) {
-      return;
-    }
-    double val = (nval / dval) * mult_by;
-    quotient.emit(getValue(val));
-    nval = 0.0;
-    dval = 0.0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java b/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
deleted file mode 100644
index a10fe95..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.Min;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-
-/**
- * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator. 
- * <p>
- * <br>
- * Application can set multiplication value for quotient(default = 1). <br>
- * Operator will calculate quotient of occurrence of key in numerator divided by
- * occurrence of key in denominator if countKey flag is true. <br>
- * Application can allow or block keys by setting filter key and inverse flag. <br>
- * <br>
- * <b>StateFull : Yes</b>, numerator/denominator values are summed over
- * application window. <br>
- * <b>Partitions : No, </b>, will yield wrong results, since values are summed
- * over app window. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>numerator</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>denominator</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>quotient</b>: emits HashMap&lt;K,Double&gt;<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>inverse :</b> if set to true the key in the filter will block tuple<br>
- * <b>filterBy :</b> List of keys to filter on<br>
- * <b>countkey :</b> Get quotient of occurrence of keys in numerator and
- * denominator. <br>
- * <b>mult_by :</b> Set multiply by constant value. <br>
- * <br>
- * @displayName Quotient Map
- * @category Math
- * @tags division, sum, map
- * @since 0.3.3
- */
-@OperatorAnnotation(partitionable = false)
-public class QuotientMap<K, V extends Number> extends
-    BaseNumberKeyValueOperator<K, V>
-{
-  /**
-   * Numerator key/sum value map.
-   */
-  protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
-
-  /**
-   * Denominator key/sum value map.
-   */
-  protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
-
-  /**
-   * Count occurrence of keys if set to true.
-   */
-  boolean countkey = false;
-
-  /**
-   * Quotient multiply by value.
-   */
-  int mult_by = 1;
-
-  /**
-   * Numerator input port.
-   */
-  public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Added tuple to the numerator hash
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      addTuple(tuple, numerators);
-    }
-  };
-
-  /**
-   * Denominator input port.
-   */
-  public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Added tuple to the denominator hash
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      addTuple(tuple, denominators);
-    }
-  };
-
-  /**
-   * Quotient output port.
-   */
-  public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>();
-
-  /**
-   * Add tuple to nval/dval map.
-   *
-   * @param tuple
-   *          key/value map on input port.
-   * @param map
-   *          key/summed value map.
-   */
-  public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
-  {
-    for (Map.Entry<K, V> e : tuple.entrySet()) {
-      addEntry(e.getKey(), e.getValue(), map);
-    }
-  }
-
-  /**
-   * Add/Update entry to key/sum value map.
-   *
-   * @param key
-   *          name.
-   * @param value
-   *          value for key.
-   * @param map
-   *          numerator/denominator key/sum map.
-   */
-  public void addEntry(K key, V value, Map<K, MutableDouble> map)
-  {
-    if (!doprocessKey(key) || (value == null)) {
-      return;
-    }
-    MutableDouble val = map.get(key);
-    if (val == null) {
-      if (countkey) {
-        val = new MutableDouble(1.00);
-      } else {
-        val = new MutableDouble(value.doubleValue());
-      }
-    } else {
-      if (countkey) {
-        val.increment();
-      } else {
-        val.add(value.doubleValue());
-      }
-    }
-    map.put(cloneKey(key), val);
-  }
-
-  /**
-   * getter for mult_by
-   *
-   * @return mult_by
-   */
-
-  @Min(0)
-  public int getMult_by()
-  {
-    return mult_by;
-  }
-
-  /**
-   * getter for countkey
-   *
-   * @return countkey
-   */
-  public boolean getCountkey()
-  {
-    return countkey;
-  }
-
-  /**
-   * Setter for mult_by
-   *
-   * @param i
-   */
-  public void setMult_by(int i)
-  {
-    mult_by = i;
-  }
-
-  /**
-   * setter for countkey
-   *
-   * @param i
-   *          sets countkey
-   */
-  public void setCountkey(boolean i)
-  {
-    countkey = i;
-  }
-
-  /**
-   * Generates tuples for each key and emits them. Only keys that are in the
-   * denominator are iterated on If the key is only in the numerator, it gets
-   * ignored (cannot do divide by 0) Clears internal data
-   */
-  @Override
-  public void endWindow()
-  {
-    HashMap<K, Double> tuples = new HashMap<K, Double>();
-    for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
-      MutableDouble nval = numerators.get(e.getKey());
-      if (nval == null) {
-        tuples.put(e.getKey(), new Double(0.0));
-      } else {
-        tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
-            .doubleValue()) * mult_by));
-      }
-    }
-    if (!tuples.isEmpty()) {
-      quotient.emit(tuples);
-    }
-    numerators.clear();
-    denominators.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java b/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
deleted file mode 100644
index c2d8465..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableInt;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.UnifierHashMapInteger;
-import com.datatorrent.lib.util.UnifierHashMapSumKeys;
-
-/**
- * Emits the sum and count of values for each key at the end of window.
- * <p>
- * Application accumulate sum across streaming window by setting cumulative flag
- * to true. <br>
- * This is an end of window operator<br>
- * <br>
- * <b>StateFull : Yes</b>, Sum is computed over application window and streaming
- * window. <br>
- * <b>Partitions : Yes</b>, Sum is unified at output port. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>sum</b>: emits HashMap&lt;K,V&gt;<br>
- * <b>count</b>: emits HashMap&lt;K,Integer&gt;</b><br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * <b>cumulative</b>: boolean flag, if set the sum is not cleared at the end of
- * window, <br>
- * hence generating cumulative sum across streaming windows. Default is false.<br>
- * <br>
- * @displayName Sum Count Map
- * @category Math
- * @tags  number, sum, counting, map
- * @since 0.3.3
- */
-public class SumCountMap<K, V extends Number> extends
-    BaseNumberKeyValueOperator<K, V>
-{
-  /**
-   * Key/double sum map.
-   */
-  protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
-
-  /**
-   * Key/integer sum map.
-   */
-  protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
-
-  /**
-   * Cumulative sum flag.
-   */
-  protected boolean cumulative = false;
-
-  /**
-   * Input port that takes a map.&nbsp; It adds the values for each key and counts the number of occurrences for each key.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * For each tuple (a HashMap of keys,val pairs) Adds the values for each
-     * key, Counts the number of occurrences of each key
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      for (Map.Entry<K, V> e : tuple.entrySet()) {
-        K key = e.getKey();
-        if (!doprocessKey(key)) {
-          continue;
-        }
-        if (sum.isConnected()) {
-          MutableDouble val = sums.get(key);
-          if (val == null) {
-            val = new MutableDouble(e.getValue().doubleValue());
-          } else {
-            val.add(e.getValue().doubleValue());
-          }
-          sums.put(cloneKey(key), val);
-        }
-        if (SumCountMap.this.count.isConnected()) {
-          MutableInt count = counts.get(key);
-          if (count == null) {
-            count = new MutableInt(0);
-            counts.put(cloneKey(key), count);
-          }
-          count.increment();
-        }
-      }
-    }
-  };
-
-  /**
-   * Key,sum map output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>()
-  {
-    @Override
-    public Unifier<HashMap<K, V>> getUnifier()
-    {
-      return new UnifierHashMapSumKeys<K, V>();
-    }
-  };
-
-  /**
-   * Key,double sum map output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>()
-  {
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public Unifier<HashMap<K, Double>> getUnifier()
-    {
-      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>();
-      ret.setType(Double.class);
-      return ret;
-    }
-  };
-
-  /**
-   * Key,integer sum output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>()
-  {
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public Unifier<HashMap<K, Integer>> getUnifier()
-    {
-      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>();
-      ret.setType(Integer.class);
-      return ret;
-    }
-  };
-
-
-        /**
-   * Key,long sum output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>()
-  {
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public Unifier<HashMap<K, Long>> getUnifier()
-    {
-      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>();
-      ret.setType(Long.class);
-      return ret;
-    }
-  };
-        
-        /**
-   * Key,short sum output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>()
-  {
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public Unifier<HashMap<K, Short>> getUnifier()
-    {
-      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>();
-      ret.setType(Short.class);
-      return ret;
-    }
-  };
-        
-        /**
-   * Key,float sum output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>()
-  {
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public Unifier<HashMap<K, Float>> getUnifier()
-    {
-      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>();
-      ret.setType(Float.class);
-      return ret;
-    }
-  };
-        
-        /**
-   * Key,integer sum output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>()
-  {
-    @Override
-    public Unifier<HashMap<K, Integer>> getUnifier()
-    {
-      return new UnifierHashMapInteger<K>();
-    }
-  };
-
-  /**
-   * Get cumulative flag.
-   *
-   * @return cumulative flag
-   */
-  public boolean isCumulative()
-  {
-    return cumulative;
-  }
-
-  /**
-   * set cumulative flag.
-   *
-   * @param cumulative
-   *          input flag
-   */
-  public void setCumulative(boolean cumulative)
-  {
-    this.cumulative = cumulative;
-  }
-
-  /**
-   * Emits on all ports that are connected. Data is precomputed during process
-   * on input port endWindow just emits it for each key Clears the internal data
-   * before return
-   */
-  @Override
-  public void endWindow()
-  {
-
-    // Should allow users to send each key as a separate tuple to load balance
-    // This is an aggregate node, so load balancing would most likely not be
-    // needed
-
-    HashMap<K, V> tuples = new HashMap<K, V>();
-    HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
-    HashMap<K, Double> dtuples = new HashMap<K, Double>();
-    HashMap<K, Integer> ituples = new HashMap<K, Integer>();
-    HashMap<K, Float> ftuples = new HashMap<K, Float>();
-    HashMap<K, Long> ltuples = new HashMap<K, Long>();
-    HashMap<K, Short> stuples = new HashMap<K, Short>();
-
-    for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
-      K key = e.getKey();
-      MutableDouble val = e.getValue();
-      tuples.put(key, getValue(val.doubleValue()));
-      dtuples.put(key, val.doubleValue());
-      ituples.put(key, val.intValue());
-      ftuples.put(key, val.floatValue());
-      ltuples.put(key, val.longValue());
-      stuples.put(key, val.shortValue());
-      // ctuples.put(key, counts.get(e.getKey()).toInteger());
-      MutableInt c = counts.get(e.getKey());
-      if (c != null) {
-        ctuples.put(key, c.toInteger());
-      }
-    }
-
-    sum.emit(tuples);
-    sumDouble.emit(dtuples);
-    sumInteger.emit(ituples);
-    sumLong.emit(ltuples);
-    sumShort.emit(stuples);
-    sumFloat.emit(ftuples);
-    count.emit(ctuples);
-    clearCache();
-  }
-
-  /**
-   * Clear sum maps.
-   */
-  private void clearCache()
-  {
-    if (!cumulative) {
-      sums.clear();
-      counts.clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
deleted file mode 100644
index e3bba8a..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * A base implementation of a BaseOperator that is a sql stream operator.&nbsp;  Subclasses should provide the
-   implementation of how to process the tuples.
- * <p>
- * Abstract sql db input operator.
- * <p>
- * @displayName Abstract Sql Stream
- * @category Stream Manipulators
- * @tags sql operator
- * @since 0.3.2
- */
-public abstract class AbstractSqlStreamOperator extends BaseOperator
-{
-  public static class InputSchema
-  {
-    public static class ColumnInfo
-    {
-      public String type;
-      public int bindIndex = 0;
-      public boolean isColumnIndex = false;
-    }
-
-    /**
-     * the name of the input "table"
-     */
-    public String name;
-    /**
-     * key is the name of the column, and value is the SQL type
-     */
-    public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>();
-
-    public InputSchema()
-    {
-    }
-
-    public InputSchema(String name)
-    {
-      this.name = name;
-    }
-
-    public void setColumnInfo(String columnName, String columnType, boolean isColumnIndex)
-    {
-      ColumnInfo t = new ColumnInfo();
-      t.type = columnType;
-      t.isColumnIndex = isColumnIndex;
-      columnInfoMap.put(columnName, t);
-    }
-
-  }
-
-  protected String statement;
-  protected ArrayList<InputSchema> inputSchemas = new ArrayList<InputSchema>(5);
-  protected transient ArrayList<Object> bindings;
-
-  /**
-   * Input bindings port that takes an arraylist of objects.
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<ArrayList<Object>> bindingsPort = new DefaultInputPort<ArrayList<Object>>()
-  {
-    @Override
-    public void process(ArrayList<Object> tuple)
-    {
-      bindings = tuple;
-    }
-
-  };
-
-  /**
-   * Input port in1 that takes a hashmap of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<HashMap<String, Object>> in1 = new DefaultInputPort<HashMap<String, Object>>()
-  {
-    @Override
-    public void process(HashMap<String, Object> tuple)
-    {
-      processTuple(0, tuple);
-    }
-
-  };
-
-  /**
-   * Input port in2 that takes a hashmap of &lt;string,object&gt;.
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<HashMap<String, Object>> in2 = new DefaultInputPort<HashMap<String, Object>>()
-  {
-    @Override
-    public void process(HashMap<String, Object> tuple)
-    {
-      processTuple(1, tuple);
-    }
-
-  };
-
-  /**
-   * Input port in3 that takes a hashmap of &lt;string,object&gt;.
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<HashMap<String, Object>> in3 = new DefaultInputPort<HashMap<String, Object>>()
-  {
-    @Override
-    public void process(HashMap<String, Object> tuple)
-    {
-      processTuple(2, tuple);
-    }
-
-  };
-
-  /**
-   * Input port in4 that takes a hashmap of &lt;string,object&gt;.
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<HashMap<String, Object>> in4 = new DefaultInputPort<HashMap<String, Object>>()
-  {
-    @Override
-    public void process(HashMap<String, Object> tuple)
-    {
-      processTuple(3, tuple);
-    }
-
-  };
-
-  /**
-   * Input port in5 that takes a hashmap of &lt;string,object&gt;.
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<HashMap<String, Object>> in5 = new DefaultInputPort<HashMap<String, Object>>()
-  {
-    @Override
-    public void process(HashMap<String, Object> tuple)
-    {
-      processTuple(4, tuple);
-    }
-
-  };
-
-  /**
-   * Output result port that emits a hashmap of &lt;string,object&gt;.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<String, Object>> result = new DefaultOutputPort<HashMap<String, Object>>();
-
-  public void setStatement(String statement)
-  {
-    this.statement = statement;
-  }
-
-  public String getStatement()
-  {
-    return this.statement;
-  }
-
-  public void setInputSchema(int inputPortIndex, InputSchema inputSchema)
-  {
-    inputSchemas.add(inputPortIndex, inputSchema);
-  }
-
-  public abstract void processTuple(int tableNum, HashMap<String, Object> tuple);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
deleted file mode 100644
index 77c7522..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.streamquery.condition.Condition;
-
-/**
- * An implementation of BaseOperator that provides sql delete query semantic on live data stream. <br>
- * <p>
- * Stream rows passing condition are emitted on output port stream. <br>
- * <br>
- * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
- * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects
- * HashMap&lt;String,Object&gt;<<br>
- * <b> outport : </b> Output hash map(row) port, emits
- * HashMap&lt;String,Object&gt;<br>
- * <br>
- * <b> Properties : <b> <br>
- * <b> condition : </b> Select condition for selecting rows. <br>
- * <b> columns : </b> Column names/aggregate functions for select. <br>
- * <br>
- * @displayName Delete
- * @category Stream Manipulators
- * @tags sql delete operator
- * @since 0.3.3
- */
-public class DeleteOperator extends BaseOperator
-{
-
-  /**
-   * condition.
-   */
-  private Condition condition = null;
-
-  /**
-   * set condition.
-   */
-  public void setCondition(Condition condition)
-  {
-    this.condition = condition;
-  }
-
-  /**
-   * Input port that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
-  {
-
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      if ((condition != null) && (!condition.isValidRow(tuple))) {
-        outport.emit(tuple);
-      }
-    }
-  };
-
-  /**
-   * Output port emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
deleted file mode 100644
index 2fe8bc3..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
-
-/**
- * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator.
- * <p>
- * @displayName Derby Sql Stream
- * @category Stream Manipulators
- * @tags sql, in-memory, input operator
- * @since 0.3.2
- */
-public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
-{
-  protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5);
-  protected List<String> execStmtStringList = new ArrayList<String>();
-  protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5);
-  protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5);
-  protected transient Connection db;
-
-  public void addExecStatementString(String stmt)
-  {
-    this.execStmtStringList.add(stmt);
-  }
-
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    System.setProperty("derby.stream.error.file", "/dev/null");
-    try {
-      Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-
-    String connUrl = "jdbc:derby:memory:MALHAR_TEMP;create=true";
-    PreparedStatement st;
-
-    try {
-      db = DriverManager.getConnection(connUrl);
-      // create the temporary tables here
-      for (int i = 0; i < inputSchemas.size(); i++) {
-        InputSchema inputSchema = inputSchemas.get(i);
-        if (inputSchema == null || inputSchema.columnInfoMap.isEmpty()) {
-          continue;
-        }
-        String columnSpec = "";
-        String columnNames = "";
-        String insertQuestionMarks = "";
-        int j = 0;
-        for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) {
-          if (!columnSpec.isEmpty()) {
-            columnSpec += ",";
-            columnNames += ",";
-            insertQuestionMarks += ",";
-          }
-          columnSpec += entry.getKey();
-          columnSpec += " ";
-          columnSpec += entry.getValue().type;
-          columnNames += entry.getKey();
-          insertQuestionMarks += "?";
-          entry.getValue().bindIndex = ++j;
-        }
-        String createTempTableStmt =
-            "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED";
-        st = db.prepareStatement(createTempTableStmt);
-        st.execute();
-        st.close();
-
-        String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES ("
-            + insertQuestionMarks + ")";
-
-        insertStatements.add(i, db.prepareStatement(insertStmt));
-        deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name));
-      }
-      for (String stmtStr : execStmtStringList) {
-        execStatements.add(db.prepareStatement(stmtStr));
-      }
-    } catch (SQLException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    try {
-      db.setAutoCommit(false);
-    } catch (SQLException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  @Override
-  public void processTuple(int tableNum, HashMap<String, Object> tuple)
-  {
-    InputSchema inputSchema = inputSchemas.get(tableNum);
-
-    PreparedStatement insertStatement = insertStatements.get(tableNum);
-    try {
-      for (Map.Entry<String, Object> entry : tuple.entrySet()) {
-        ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey());
-        if (t != null && t.bindIndex != 0) {
-          insertStatement.setString(t.bindIndex, entry.getValue().toString());
-        }
-      }
-
-      insertStatement.executeUpdate();
-      insertStatement.clearParameters();
-    } catch (SQLException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  @Override
-  public void endWindow()
-  {
-    try {
-      db.commit();
-      if (bindings != null) {
-        for (int i = 0; i < bindings.size(); i++) {
-          for (PreparedStatement stmt : execStatements) {
-            stmt.setString(i, bindings.get(i).toString());
-          }
-        }
-      }
-
-      for (PreparedStatement stmt : execStatements) {
-        executePreparedStatement(stmt);
-      }
-      for (PreparedStatement st : deleteStatements) {
-        st.executeUpdate();
-        st.clearParameters();
-      }
-    } catch (SQLException ex) {
-      throw new RuntimeException(ex);
-    }
-    bindings = null;
-  }
-
-  private void executePreparedStatement(PreparedStatement statement) throws SQLException
-  {
-    ResultSet res = statement.executeQuery();
-    ResultSetMetaData resmeta = res.getMetaData();
-    int columnCount = resmeta.getColumnCount();
-    while (res.next()) {
-      HashMap<String, Object> resultRow = new HashMap<String, Object>();
-      for (int i = 1; i <= columnCount; i++) {
-        resultRow.put(resmeta.getColumnName(i), res.getObject(i));
-      }
-      this.result.emit(resultRow);
-    }
-    statement.clearParameters();
-  }
-
-  @Override
-  public void teardown()
-  {
-    try {
-      db.close();
-    } catch (SQLException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
deleted file mode 100644
index 1821953..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.HavingCondition;
-import com.datatorrent.lib.streamquery.function.FunctionIndex;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-
-/**
- * An implementation of BaseOperator that provides sql group by querying semantics on live data stream. <br>
- * <p>
- * Stream rows satisfying given select condition are processed by group by
- * column names and aggregate column function. <br>
- * If having condition is specified for aggregate index(s), it must also be
- * satisfied by row. HashMap of column name(s) and aggregate alias is emitted on
- * output port. <br>
- * <br>
- * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
- * <b>Partitions : No, </b> will yield wrong result(s). <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects
- * HashMap&lt;String,Object&gt;<<br>
- * <b> outport : </b> Output hash map(row) port, emits
- * HashMap&lt;String,Object&gt;<br>
- * <br>
- * <b> Properties : <b> <br>
- * <b> condition : </b> Select condition for deleting rows. <br>
- * <b> columnGroupIndexes : </b> Group by names list. <br>
- * <b> indexes : </b> Select column indexes. <br>
- * <b> havingConditions : </b> Having filter conditions for aggregate(s). <br>
- * <br>
- * @displayName GroupBy Having Operator
- * @category Stream Manipulators
- * @tags sql, groupby operator, condition, index
- * @since 0.3.4
- */
-@OperatorAnnotation(partitionable = false)
-public class GroupByHavingOperator extends BaseOperator
-{
-
-  /**
-   * aggregate indexes.
-   */
-  private ArrayList<FunctionIndex> aggregates = new ArrayList<FunctionIndex>();
-
-  /**
-   * Column, Group by names
-   */
-  private ArrayList<ColumnIndex> columnGroupIndexes = new ArrayList<ColumnIndex>();
-
-  /**
-   * where condition.
-   */
-  private Condition condition;
-
-  /**
-   * having aggregate condtion;
-   */
-  private ArrayList<HavingCondition> havingConditions = new ArrayList<HavingCondition>();
-
-  /**
-   * Table rows.
-   */
-  private ArrayList<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
-
-  public void addAggregateIndex(@NotNull FunctionIndex index)
-  {
-    aggregates.add(index);
-  }
-
-  public void addColumnGroupByIndex(@NotNull ColumnIndex index)
-  {
-    columnGroupIndexes.add(index);
-  }
-
-  public void addHavingCondition(@NotNull HavingCondition condition)
-  {
-    havingConditions.add(condition);
-  }
-
-  /**
-   * @param condition condition
-   */
-  public void setCondition(Condition condition)
-  {
-    this.condition = condition;
-  }
-
-  /**
-   * Input port that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
-  {
-
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      if ((condition != null) && (!condition.isValidRow(tuple))) {
-        return;
-      }
-      rows.add(tuple);
-    }
-  };
-
-  /**
-   * Output port that emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
-
-  /**
-   * Create aggregate at end window.
-   */
-  @Override
-  public void endWindow()
-  {
-    // group names
-    if (columnGroupIndexes.size() == 0) {
-      rows = new ArrayList<Map<String, Object>>();
-      return;
-    }
-
-    // group rows
-    HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>> groups = new HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>>();
-    for (Map<String, Object> row : rows) {
-      MultiKeyCompare key = new MultiKeyCompare();
-      for (ColumnIndex index : columnGroupIndexes) {
-        key.addCompareKey(row.get(index.getColumn()));
-      }
-      ArrayList<Map<String, Object>> subRows;
-      if (groups.containsKey(key)) {
-        subRows = groups.get(key);
-      } else {
-        subRows = new ArrayList<Map<String, Object>>();
-        groups.put(key, subRows);
-      }
-      subRows.add(row);
-    }
-
-    // Iterate over groups and emit aggregate values
-    for (Map.Entry<MultiKeyCompare, ArrayList<Map<String, Object>>> entry : groups
-        .entrySet()) {
-      ArrayList<Map<String, Object>> subRows = entry.getValue();
-
-      // get result
-      Map<String, Object> result = new HashMap<String, Object>();
-      for (ColumnIndex index : columnGroupIndexes) {
-        index.filter(subRows.get(0), result);
-      }
-
-      // append aggregate values
-      for (FunctionIndex aggregate : aggregates) {
-        try {
-          aggregate.filter(subRows, result);
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-
-      // check valid having aggregate
-      boolean isValidHaving = true;
-      for (HavingCondition condition : havingConditions) {
-        try {
-          isValidHaving &= condition.isValidAggregate(subRows);
-        } catch (Exception e) {
-          e.printStackTrace();
-          return;
-        }
-      }
-      if (isValidHaving) {
-        outport.emit(result);
-      }
-    }
-
-    rows = new ArrayList<Map<String, Object>>();
-  }
-
-  /**
-   * multi key compare class.
-   */
-  @SuppressWarnings("rawtypes")
-  private class MultiKeyCompare implements Comparable
-  {
-
-    /**
-     * compare keys.
-     */
-    ArrayList<Object> compareKeys = new ArrayList<Object>();
-
-    @Override
-    public boolean equals(Object other)
-    {
-      if (other instanceof MultiKeyCompare) {
-        if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) {
-          return false;
-        }
-      }
-      for (int i = 0; i < compareKeys.size(); i++) {
-        if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-      int hashCode = 0;
-      for (int i = 0; i < compareKeys.size(); i++) {
-        hashCode += compareKeys.get(i).hashCode();
-      }
-      return hashCode;
-    }
-
-    @Override
-    public int compareTo(Object other)
-    {
-      if (this.equals(other)) {
-        return 0;
-      }
-      return -1;
-    }
-
-    /**
-     * Add compare key.
-     */
-    public void addCompareKey(Object value)
-    {
-      compareKeys.add(value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
deleted file mode 100644
index 883329e..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.index.Index;
-
-/**
- * An implementation of Operator that reads table row data from two table data input ports. <br>
- * <p>
- * Operator joins row on given condition and selected names, emits
- * joined result at output port.
- *  <br>
- *  <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
- *  <b>Partitions : No, </b> will yield wrong result(s). <br>
- *  <br>
- *  <b>Ports : </b> <br>
- *  <b> inport1 : </b> Input port for table 1, expects HashMap&lt;String, Object&gt; <br>
- *  <b> inport1 : </b> Input port for table 2, expects HashMap&lt;String, Object&gt; <br>
- *  <b> outport : </b> Output joined row port, emits HashMap&lt;String, ArrayList&lt;Object&gt;&gt; <br>
- *  <br>
- *  <b> Properties : </b>
- *  <b> joinCondition : </b> Join condition for table rows. <br>
- *  <b> table1Columns : </b> Columns to be selected from table1. <br>
- *  <b> table2Columns : </b> Columns to be selected from table2. <br>
- *  <br>
- * @displayName Inner join
- * @category Stream Manipulators
- * @tags sql, inner join operator
- *
- * @since 0.3.3
- */
-@OperatorAnnotation(partitionable = false)
-public class InnerJoinOperator implements Operator
-{
-
-  /**
-   * Join Condition;
-   */
-  protected Condition joinCondition;
-
-  /**
-   * Table1 select columns.
-   */
-  private ArrayList<Index> table1Columns = new ArrayList<Index>();
-
-  /**
-   * Table2 select columns.
-   */
-  private ArrayList<Index> table2Columns = new ArrayList<Index>();
-
-  /**
-   * Collect data rows from input port 1.
-   */
-  protected ArrayList<Map<String, Object>> table1;
-
-  /**
-   * Collect data from input port 2.
-   */
-  protected ArrayList<Map<String, Object>> table2;
-
-  /**
-   * Input port 1 that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>()
-  {
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      table1.add(tuple);
-      for (int j = 0; j < table2.size(); j++) {
-        if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) {
-          joinRows(tuple, table2.get(j));
-        }
-      }
-    }
-  };
-
-  /**
-   * Input port 2 that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>()
-  {
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      table2.add(tuple);
-      for (int j = 0; j < table1.size(); j++) {
-        if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) {
-          joinRows(table1.get(j), tuple);
-        }
-      }
-    }
-  };
-
-  /**
-   * Output port that emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport =
-      new DefaultOutputPort<Map<String, Object>>();
-
-  @Override
-  public void setup(OperatorContext arg0)
-  {
-    table1 = new ArrayList<Map<String, Object>>();
-    table2 = new ArrayList<Map<String, Object>>();
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
-  @Override
-  public void beginWindow(long arg0)
-  {
-  }
-
-  @Override
-  public void endWindow()
-  {
-    table1.clear();
-    table2.clear();
-  }
-
-  /**
-   * @return the joinCondition
-   */
-  public Condition getJoinCondition()
-  {
-    return joinCondition;
-  }
-
-  /**
-   * Pick the supported condition. Currently only equal join is supported.
-   * @param joinCondition joinCondition
-   */
-  public void setJoinCondition(Condition joinCondition)
-  {
-    this.joinCondition = joinCondition;
-  }
-
-  /**
-   * Select table1 column name.
-   */
-  public void selectTable1Column(Index column)
-  {
-    table1Columns.add(column);
-  }
-
-  /**
-   * Select table2 column name.
-   */
-  public void selectTable2Column(Index column)
-  {
-    table2Columns.add(column);
-  }
-
-  /**
-   * Join row from table1 and table2.
-   */
-  protected void joinRows(Map<String, Object> row1, Map<String, Object> row2)
-  {
-    // joined row
-    Map<String, Object> join = new HashMap<String, Object>();
-
-    // filter table1 columns
-    if (row1 != null) {
-      for (Index index: table1Columns) {
-        index.filter(row1, join);
-      }
-    }
-
-    // filter table1 columns
-    if (row2 != null) {
-      for (Index index: table2Columns) {
-        index.filter(row2, join);
-      }
-    }
-
-    // emit row
-    outport.emit(join);
-  }
-
-}