You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:50 UTC

[06/12] apex-malhar git commit: Updated algo & working on math operators

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FilterKeysHashMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/FilterKeysHashMap.java b/library/src/main/java/com/datatorrent/lib/algo/FilterKeysHashMap.java
deleted file mode 100644
index 0f9a738..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/FilterKeysHashMap.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.api.annotation.Stateless;
-
-import com.datatorrent.lib.util.BaseKeyOperator;
-
-
-/**
- * This operator filters the incoming stream of key value pairs based on the keys specified by property "keys".
- * <p>
- * Filters the incoming stream based of keys specified by property "keys". If
- * property "inverse" is set to "true", then all keys except those specified by "keys" are emitted
- * </p>
- * <p>
- * Operator assumes that the key, val pairs are immutable objects. If this operator has to be used for mutable objects,
- * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of V.<br>
- * This is a pass through node.<br>
- * <br>
- * <b>StateFull : No, </b> tuple are processed in current window. <br>
- * <b>Partitions : Yes, </b> no dependency among input tuples. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: Expects Map&lt;K, HashMap&lt;K,V&gt;&gt. Filters are applied only on keys of second hash map.<br>
- * <b>filter</b>: Emits HashMap&lt;K, HashMap&lt;K,V&gt;&gt.<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>keys</b>: The keys to pass through, rest are filtered/dropped. A comma separated list of keys.<br>
- * <br>
- * </p>
- *
- * @displayName Filter Keyval Pairs By Key HashMap
- * @category Stream Manipulators
- * @tags filter, key value
- *
- * @since 0.3.2
- */
-@Stateless
-@OperatorAnnotation(partitionable = true)
-public class FilterKeysHashMap<K, V> extends BaseKeyOperator<K>
-{
-  /**
-   * Filter keys map.
-   */
-  @NotNull()
-  HashMap<K, V> keys = new HashMap<K, V>();
-
-  /**
-   * Emits key not in filter map.
-   */
-  boolean inverse = false;
-
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<Map<K, HashMap<K, V>>> data = new DefaultInputPort<Map<K, HashMap<K, V>>>()
-  {
-    /**
-     * Processes incoming tuples one key,val at a time. Emits if at least one key makes the cut.
-     * By setting inverse as true, match is changed to un-matched.
-     */
-    @Override
-    public void process(Map<K, HashMap<K, V>> tuple)
-    {
-      HashMap<K, HashMap<K, V>> dtuple = null;
-      for (Map.Entry<K, HashMap<K, V>> e: tuple.entrySet()) {
-        HashMap<K, V> dtuple2 = null;
-        for (Map.Entry<K, V> e2: e.getValue().entrySet()) {
-          boolean contains = keys.containsKey(e2.getKey());
-          if ((contains && !inverse) || (!contains && inverse)) {
-            if (dtuple2 == null) {
-              dtuple2 = new HashMap<K, V>(4); // usually the filter keys are very few, so 4 is just fine
-            }
-            dtuple2.put(cloneKey(e2.getKey()), cloneValue(e2.getValue()));
-          }
-        }
-        if (dtuple == null && dtuple2 != null) {
-          dtuple = new HashMap<K, HashMap<K, V>>();
-        }
-        if (dtuple != null && dtuple2 != null) {
-          dtuple.put(cloneKey(e.getKey()), dtuple2);
-        }
-      }
-      if (dtuple != null) {
-        filter.emit(dtuple);
-      }
-    }
-  };
-
-  /**
-   * The output port on which filtered key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, HashMap<K, V>>> filter = new DefaultOutputPort<HashMap<K, HashMap<K, V>>>();
-
-  /**
-   * getter function for parameter inverse
-   *
-   * @return inverse
-   */
-  public boolean getInverse()
-  {
-    return inverse;
-  }
-
-  /**
-   * True means match; False means unmatched
-   *
-   * @param val
-   */
-  public void setInverse(boolean val)
-  {
-    inverse = val;
-  }
-
-  /**
-   * Adds a key to the filter list
-   *
-   * @param str
-   */
-  public void setKey(K str)
-  {
-    keys.put(str, null);
-  }
-
-  /**
-   * Adds the list of keys to the filter list
-   *
-   * @param list
-   */
-  public void setKeys(K[] list)
-  {
-    if (list != null) {
-      for (K e: list) {
-        keys.put(e, null);
-      }
-    }
-  }
-
-  /*
-   * Clears the filter list
-   */
-  public void clearKeys()
-  {
-    keys.clear();
-  }
-
-  /**
-   * Clones V object. By default assumes immutable object (i.e. a copy is not made). If object is mutable, override this method
-   *
-   * @param v value to be cloned
-   * @return returns v as is (assumes immutable object)
-   */
-  public V cloneValue(V v)
-  {
-    return v;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FilterKeysMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/FilterKeysMap.java b/library/src/main/java/com/datatorrent/lib/algo/FilterKeysMap.java
deleted file mode 100644
index 136a5d4..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/FilterKeysMap.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.api.annotation.Stateless;
-
-import com.datatorrent.lib.util.BaseKeyOperator;
-import com.datatorrent.lib.util.UnifierHashMap;
-
-/**
- * This operator filters the incoming stream of key value pairs based on the keys specified by property "keys"..
- * <p>
- * Filters the incoming stream based of keys specified by property "keys". If
- * property "inverse" is set to "true", then all keys except those specified by "keys" are emitted
- * </p>
- * <p>
- * Operator assumes that the key, val pairs are immutable objects. If this operator has to be used for mutable objects,
- * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of V.<br>
- * This is a pass through node<br>
- * <br>
- * <b>StateFull : No, </b> tuple are processed in current window. <br>
- * <b>Partitions : Yes, </b> no dependency among input tuples. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: Expects Map&lt;K,V&gt;<br>
- * <b>filter</b>: Emits HashMap&lt;K,V&gt;<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>keys</b>: The keys to pass through, rest are filtered/dropped. A comma separated list of keys<br>
- * <br>
- * </p>
- *
- * @displayName Filter Keyval Pairs By Key Generic
- * @category Rules and Alerts
- * @tags filter, key value
- *
- * @since 0.3.2
- */
-@Stateless
-@OperatorAnnotation(partitionable = true)
-public class FilterKeysMap<K,V> extends BaseKeyOperator<K>
-{
-  /**
-   * Filter keys map.
-   */
-  @NotNull()
-  HashMap<K, V> keys = new HashMap<K, V>();
-
-  /**
-   * Emits key not in filter map.
-   */
-  boolean inverse = false;
-
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Processes incoming tuples one key,val at a time. Emits if at least one key makes the cut
-     * By setting inverse as true, match is changed to un-matched
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      HashMap<K, V> dtuple = null;
-      for (Map.Entry<K, V> e: tuple.entrySet()) {
-        boolean contains = keys.containsKey(e.getKey());
-        if ((contains && !inverse) || (!contains && inverse)) {
-          if (dtuple == null) {
-            dtuple = new HashMap<K, V>(4); // usually the filter keys are very few, so 4 is just fine
-          }
-          dtuple.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
-        }
-      }
-      if (dtuple != null) {
-        filter.emit(dtuple);
-      }
-    }
-  };
-
-  /**
-   * The output port on which filtered key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> filter = new DefaultOutputPort<HashMap<K, V>>()
-  {
-    @Override
-    public Unifier<HashMap<K, V>> getUnifier()
-    {
-      return new UnifierHashMap<K, V>();
-    }
-  };
-
-  /**
-   * If true then only matches are emitted. If false then only non matches are emitted.
-   * @return inverse
-   */
-  public boolean getInverse()
-  {
-    return inverse;
-  }
-
-
-  /**
-   * Sets the inverse property. If true then only matches are emitted. If false then only non matches are emitted.
-   * @param val
-   */
-  public void setInverse(boolean val)
-  {
-    inverse = val;
-  }
-
-  /**
-   * Adds a key to the filter list
-   * @param str
-   */
-  public void setKey(K str)
-  {
-    keys.put(str, null);
-  }
-
-  /**
-   * Adds the list of keys to the filter list
-   * @param list
-   */
-  public void setKeys(K[] list)
-  {
-    if (list != null) {
-      for (K e: list) {
-        keys.put(e, null);
-      }
-    }
-  }
-
-  /**
-   * The keys to filter. The values in the map should be null.
-   * @param keys
-   */
-  public void setKeys(HashMap<K, V> keys)
-  {
-    this.keys = keys;
-  }
-
-  /**
-   * Gets the keys to filter.
-   * @return Returns a map containing the keys.
-   */
-  public HashMap<K, V> getKeys()
-  {
-    return keys;
-  }
-
-  /*
-   * Clears the filter list
-   */
-  public void clearKeys()
-  {
-    keys.clear();
-  }
-
-  /**
-   * Clones V object. By default assumes immutable object (i.e. a copy is not made). If object is mutable, override this method
-   * @param v value to be cloned
-   * @return returns v as is (assumes immutable object)
-   */
-  public V cloneValue(V v)
-  {
-    return v;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FilterValues.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/FilterValues.java b/library/src/main/java/com/datatorrent/lib/algo/FilterValues.java
index 3b78ac4..c7a70bb 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/FilterValues.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/FilterValues.java
@@ -18,7 +18,8 @@
  */
 package com.datatorrent.lib.algo;
 
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.HashSet;
 
 import javax.validation.constraints.NotNull;
 
@@ -72,7 +73,7 @@ public class FilterValues<T> extends BaseOperator
     @Override
     public void process(T tuple)
     {
-      boolean contains = values.containsKey(tuple);
+      boolean contains = values.contains(tuple);
       if ((contains && !inverse) || (!contains && inverse)) {
         filter.emit(cloneValue(tuple));
       }
@@ -85,7 +86,7 @@ public class FilterValues<T> extends BaseOperator
   public final transient DefaultOutputPort<T> filter = new DefaultOutputPort<T>();
 
   @NotNull()
-  HashMap<T, Object> values = new HashMap<T, Object>();
+  HashSet<T> values = new HashSet<T>();
   boolean inverse = false;
 
   /**
@@ -114,7 +115,7 @@ public class FilterValues<T> extends BaseOperator
   public void setValue(T val)
   {
     if (val != null) {
-      values.put(val, null);
+      values.add(val);
     }
   }
 
@@ -126,9 +127,7 @@ public class FilterValues<T> extends BaseOperator
   public void setValues(T[] list)
   {
     if (list != null) {
-      for (T e: list) {
-        values.put(e, null);
-      }
+      values.addAll(Arrays.asList(list));
     }
   }
 
@@ -136,7 +135,7 @@ public class FilterValues<T> extends BaseOperator
    * Gets the values to be filtered.
    * @return The values to be filtered.
    */
-  public HashMap<T, Object> getValues()
+  public HashSet<T> getValues()
   {
     return values;
   }
@@ -146,7 +145,7 @@ public class FilterValues<T> extends BaseOperator
    * values are set to be null.
    * @param values The values to be filtered.
    */
-  public void setValues(HashMap<T, Object> values)
+  public void setValues(HashSet<T> values)
   {
     this.values = values;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FirstMatchMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/FirstMatchMap.java b/library/src/main/java/com/datatorrent/lib/algo/FirstMatchMap.java
deleted file mode 100644
index 991b24b..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/FirstMatchMap.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.BaseMatchOperator;
-
-/**
- * This operator filters the incoming stream of key value pairs by obtaining the values corresponding to a specified key,
- * and comparing those values to a specified number.&nbsp;The first key value pair, in each window, to satisfy the comparison is emitted.
- * <p>
- * A compare metric on a Number tuple based on the property "key", "value", and "cmp"; the first match is emitted.
- *  The comparison is done by getting double value from the Number.
- * </p>
- * <p>
- * This module is a pass through<br>
- * The operators by default assumes immutable keys. If the key is mutable, use cloneKey to make a copy<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are processed in current window. <br>
- * <b>Partitions : No, </b>will yield wrong results. <br>
- * <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>first</b>: emits HashMap&lt;K,V&gt;<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Specific compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
- * <br>
- * </p>
- *
- * @displayName Emit First Match (Number)
- * @category Rules and Alerts
- * @tags filter, key value, numeric
- *
- * @since 0.3.2
- */
-
-@OperatorAnnotation(partitionable = false)
-public class FirstMatchMap<K, V extends Number> extends BaseMatchOperator<K,V>
-{
-  /**
-   * Tuple emitted flag.
-   */
-  boolean emitted = false;
-
-  /**
-   * The port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Checks if required key,val pair exists in the HashMap. If so tuple is emitted, and emitted flag is set
-     * to true
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      if (emitted) {
-        return;
-      }
-      V val = tuple.get(getKey());
-      if (val == null) { // skip if key does not exist
-        return;
-      }
-      if (compareValue(val.doubleValue())) {
-        first.emit(cloneTuple(tuple));
-        emitted = true;
-      }
-    }
-  };
-
-  /**
-   * The output port on which the first satisfying key value pair is emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> first = new DefaultOutputPort<HashMap<K, V>>();
-
-  /**
-   * Resets emitted flag to false
-   * @param windowId
-   */
-  @Override
-  public void beginWindow(long windowId)
-  {
-    emitted = false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FirstN.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/FirstN.java b/library/src/main/java/com/datatorrent/lib/algo/FirstN.java
deleted file mode 100644
index d9db3cf..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/FirstN.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableInt;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.AbstractBaseNOperatorMap;
-
-/**
- * This operator filters the incoming stream of key value pairs by emitting the first N key value pairs with a specified key in each window.
- * <p>
- * Emits first N tuples of a particular key.
- * </p>
- * <p>
- * This module is a pass through module<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br>
- * <b>Partitions : No, </b> will yield wrong results. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: Input data port expects HashMap&lt;K,V&gt;<br>
- * <b>bottom</b>: Output data port, emits HashMap&lt;K,V&gt;<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>N</b>: The number of top values to be emitted per key<br>
- * <br>
- * <b>Specific compile time checks are</b>:<br>
- * N: Has to be >= 1<br>
- * <br>
- * <br>
- * </p>
- *
- * @displayName First N Keyval Pairs Matching Key
- * @category Rules and Alerts
- * @tags filter, key value
- *
- * @since 0.3.2
- */
-@OperatorAnnotation(partitionable = false)
-public class FirstN<K,V> extends AbstractBaseNOperatorMap<K, V>
-{
-  /**
-   * key count map.
-   */
-  HashMap<K, MutableInt> keycount = new HashMap<K, MutableInt>();
-
-  /**
-   * Inserts tuples into the queue
-   * @param tuple to insert in the queue
-   */
-  @Override
-  public void processTuple(Map<K, V> tuple)
-  {
-    for (Map.Entry<K, V> e: tuple.entrySet()) {
-      MutableInt count = keycount.get(e.getKey());
-      if (count == null) {
-        count = new MutableInt(0);
-        keycount.put(e.getKey(), count);
-      }
-      count.increment();
-      if (count.intValue() <= getN()) {
-        first.emit(cloneTuple(e.getKey(), e.getValue()));
-      }
-    }
-  }
-
-  /**
-   * The output port on which the first N key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> first = new DefaultOutputPort<HashMap<K, V>>();
-
-  /**
-   * Clears the cache to start anew in a new window
-   */
-  @Override
-  public void endWindow()
-  {
-    keycount.clear();
-  }
-
-  /**
-   * First N number of KeyValue pairs for each Key.
-   *
-   * @param val
-   */
-  public void setN(int val)
-  {
-    super.setN(val);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FirstTillMatch.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/FirstTillMatch.java b/library/src/main/java/com/datatorrent/lib/algo/FirstTillMatch.java
deleted file mode 100644
index d6a2615..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/FirstTillMatch.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.BaseMatchOperator;
-
-/**
- * This operator filters the incoming stream of key value pairs by obtaining the values corresponding to a specified key,
- * and comparing those values to a specified number.&nbsp;For each window, all key value pairs are emitted by the operator until a value satisfying the comparison is encountered.
- * <p>
- * All key.val pairs with val sub-classed from Number are emitted till the first match;  A compare metric is done based on the property "key",
- * "value", and "cmp". Then on no tuple is emitted in that window. The comparison is done by getting double value of the Number.
- * </p>
- * <p>
- * This module is a pass through<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are processed in current window. <br>
- * <b>Partitions : No, </b>will yield wrong results. <br>
- * <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: Input port, expects HashMap&lt;K,V&gt;<br>
- * <b>first</b>: Output port, emits HashMap&lt;K,V&gt; if compare function returns true<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Specific compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
- * <br>
- * </p>
- *
- * @displayName Emit Keyval Pairs Until Match (Number)
- * @category Rules and Alerts
- * @tags filter, key value, numeric
- *
- * @since 0.3.2
- */
-@OperatorAnnotation(partitionable = false)
-public class FirstTillMatch<K, V extends Number> extends BaseMatchOperator<K, V>
-{
-  /**
-   * Tuple emitted flag.
-   */
-  boolean emitted = false;
-
-  /**
-   * The input port on which incoming key value pairs are received.
-   */
-  public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>()
-  {
-    /**
-     * Compares the key,val pair with the match condition. Till a match is found tuples are emitted.
-     * Once a match is found, state is set to emitted, and no more tuples are compared (no more emits).
-     */
-    @Override
-    public void process(HashMap<K, V> tuple)
-    {
-      if (emitted) {
-        return;
-      }
-      V val = tuple.get(getKey());
-      if (val == null) { // skip if the key does not exist
-        return;
-      }
-      if (compareValue(val.doubleValue())) {
-        emitted = true;
-      }
-      if (!emitted) {
-        first.emit(cloneTuple(tuple));
-      }
-    }
-  };
-
-  /**
-   * The output port on which key value pairs are emitted until the first match.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> first = new DefaultOutputPort<HashMap<K, V>>();
-
-  /**
-   * Emitted set is reset to false
-   * @param windowId
-   */
-  @Override
-  public void beginWindow(long windowId)
-  {
-    emitted = false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/InsertSortDesc.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/InsertSortDesc.java b/library/src/main/java/com/datatorrent/lib/algo/InsertSortDesc.java
deleted file mode 100644
index 4498cfb..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/InsertSortDesc.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.PriorityQueue;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.AbstractBaseSortOperator;
-import com.datatorrent.lib.util.ReversibleComparator;
-
-/**
- * This operator takes the values it receives each window and outputs them in ascending order at the end of each window.
- * <p>
- * Incoming tuple is inserted into already existing sorted list in a descending order. At the end of the window the resultant sorted list is emitted on the output ports.
- * </p>
- * <p>
- * <br>
- * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br>
- * <b>Partitions : No, </b> will yield wrong results. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects K<br>
- * <b>datalist</b>: expects ArrayList&lt;K&gt;<br>
- * <b>sortlist</b>: emits ArrayList&lt;K&gt;<br>
- * <b>sorthash</b>: emits HashMap&lt;K,Integer&gt;<br>
- * <br>
- * <br>
- * </p>
- * @displayName Sort Descending
- * @category Stream Manipulators
- * @tags rank, sort
- *
- * @since 0.3.2
- */
-//
-// TODO: Override PriorityQueue and rewrite addAll to insert with location
-//
-@OperatorAnnotation(partitionable = false)
-public class InsertSortDesc<K> extends AbstractBaseSortOperator<K>
-{
-  /**
-   * The input port on which individual tuples are received for sorting.
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
-  {
-    /**
-     * Adds tuple to sorted queue
-     */
-    @Override
-    public void process(K tuple)
-    {
-      processTuple(tuple);
-    }
-  };
-  /**
-   * The input port on which lists of tuples are received for sorting.
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<ArrayList<K>> datalist = new DefaultInputPort<ArrayList<K>>()
-  {
-    /**
-     * Adds tuples to sorted queue
-     */
-    @Override
-    public void process(ArrayList<K> tuple)
-    {
-      processTuple(tuple);
-    }
-  };
-
-  /**
-   * The output port on which a sorted descending list of tuples is emitted.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ArrayList<K>> sort = new DefaultOutputPort<ArrayList<K>>();
-  @OutputPortFieldAnnotation(optional = true)
-  /**
-   * This output port emits a map from tuples to a count of the number of times each tuple occurred in the application window.
-   */
-  public final transient DefaultOutputPort<HashMap<K, Integer>> sorthash = new DefaultOutputPort<HashMap<K, Integer>>();
-
-  @Override
-  public void initializeQueue()
-  {
-    pqueue = new PriorityQueue<K>(getSize(), new ReversibleComparator<K>(false));
-  }
-
-
-  @Override
-  public void emitToList(ArrayList<K> list)
-  {
-    sort.emit(list);
-  }
-
-  @Override
-  public void emitToHash(HashMap<K,Integer> map)
-  {
-    sorthash.emit(map);
-  }
-
-  @Override
-  public boolean doEmitList()
-  {
-    return sort.isConnected();
-  }
-
-  @Override
-  public boolean doEmitHash()
-  {
-    return sorthash.isConnected();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/InvertIndex.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/InvertIndex.java b/library/src/main/java/com/datatorrent/lib/algo/InvertIndex.java
deleted file mode 100644
index 821dfb9..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/InvertIndex.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.Unifier;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.BaseKeyValueOperator;
-
-/**
- * This operator takes a stream of key value pairs each window,
- * and outputs a set of inverted key value pairs at the end of each window.
- * <p>
- * Inverts the index and sends out the tuple on output port "index" at the end of the window.
- * </p>
- * <p>
- * This is an end of window operator<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br>
- * <b>Partitions : Yes, </b> inverted indexes are unified by instance of same operator. <br>
- * <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects &lt;K,V&gt;<br>
- * <b>index</b>: emits &lt;V,ArrayList&lt;K&gt;&gt;(1); one HashMap per V<br>
- * <br>
- * </p>
- *
- * @displayName Invert Key Value Pairs
- * @category Stream Manipulators
- * @tags key value
- *
- * @since 0.3.2
- */
-
-@OperatorAnnotation(partitionable = true)
-public class InvertIndex<K, V> extends BaseKeyValueOperator<K, V> implements Unifier<HashMap<V, ArrayList<K>>>
-{
-  /**
-   * Inverted key/value map.
-   */
-  protected HashMap<V, ArrayList<K>> map = new HashMap<V, ArrayList<K>>();
-
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>()
-  {
-    /**
-     * Reverse indexes a HashMap<K, ArrayList<V>> tuple
-     */
-    @Override
-    public void process(HashMap<K, V> tuple)
-    {
-      for (Map.Entry<K, V> e: tuple.entrySet()) {
-        if (e.getValue() == null) { // error tuple?
-          continue;
-        }
-        insert(e.getValue(), cloneKey(e.getKey()));
-      }
-    }
-  };
-
-  /**
-   * The output port on which inverted key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<V, ArrayList<K>>> index = new DefaultOutputPort<HashMap<V, ArrayList<K>>>()
-  {
-    @Override
-    public Unifier<HashMap<V, ArrayList<K>>> getUnifier()
-    {
-      return new InvertIndex<K, V>();
-    }
-  };
-
-  /**
-   *
-   * Returns the ArrayList stored for a key
-   *
-   * @param key
-   */
-  void insert(V val, K key)
-  {
-    ArrayList<K> list = map.get(val);
-    if (list == null) {
-      list = new ArrayList<K>(4);
-      map.put(cloneValue(val), list);
-    }
-    list.add(key);
-  }
-
-  /**
-   * Emit all the data and clear the hash
-   * Clears internal data
-   */
-  @Override
-  public void endWindow()
-  {
-    for (Map.Entry<V, ArrayList<K>> e: map.entrySet()) {
-      HashMap<V, ArrayList<K>> tuple = new HashMap<V, ArrayList<K>>(1);
-      tuple.put(e.getKey(), e.getValue());
-      index.emit(tuple);
-    }
-    map.clear();
-  }
-
-  /**
-   * Unifier override.
-   */
-  @Override
-  public void process(HashMap<V, ArrayList<K>> tuple)
-  {
-    for (Map.Entry<V, ArrayList<K>> e: tuple.entrySet()) {
-      ArrayList<K> keys;
-      if (map.containsKey(e.getKey())) {
-        keys = map.remove(e.getKey());
-      } else {
-        keys = new ArrayList<K>();
-      }
-      keys.addAll(e.getValue());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/InvertIndexArray.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/InvertIndexArray.java b/library/src/main/java/com/datatorrent/lib/algo/InvertIndexArray.java
deleted file mode 100644
index 330049a..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/InvertIndexArray.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.BaseKeyValueOperator;
-
-/**
- * This operator takes a stream of key value pairs each window,
- * and outputs a set of inverted key value pairs at the end of each window.&nbsp;
- * The values in the key value pairs received by this operator are an array lists, which may multiple values.
- * <p>
- * Inverts the index and sends out the tuple on output port "index" at the end of the window.
- * </p>
- * <p>
- * This is an end of window operator<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br>
- * <b>Partitions : Yes, </b> inverted indexes are unified by instance of same operator. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects HashMap&lt;K,ArrayList&lt;V&gt;&gt;<br>
- * <b>index</b>: emits HashMap&lt;V,ArrayList&lt;K&gt;&gt;(1), one HashMap per V<br>
- * <br>
- * </p>
- *
- * @displayName Invert Key Value Pairs (Array)
- * @category Stream Manipulators
- * @tags key value
- *
- * @since 0.3.2
- */
-
-@OperatorAnnotation(partitionable = true)
-public class InvertIndexArray<K, V> extends BaseKeyValueOperator<K,V>
-{
-  /**
-   * Inverted key/value map.
-   */
-  protected HashMap<V, ArrayList<K>> map = new HashMap<V, ArrayList<K>>();
-
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<HashMap<K, ArrayList<V>>> data = new DefaultInputPort<HashMap<K, ArrayList<V>>>()
-  {
-    /**
-     * Reverse indexes a HashMap<K, ArrayList<V>> tuple
-     */
-    @Override
-    public void process(HashMap<K, ArrayList<V>> tuple)
-    {
-      for (Map.Entry<K, ArrayList<V>> e: tuple.entrySet()) {
-        ArrayList<V> alist = e.getValue();
-        if (alist == null) { // error tuple?
-          continue;
-        }
-        for (V val : alist) {
-          insert(val, cloneKey(e.getKey()));
-        }
-      }
-    }
-  };
-
-  /**
-   * The output port or which inverted key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<V, ArrayList<K>>> index = new DefaultOutputPort<HashMap<V, ArrayList<K>>>()
-  {
-    @Override
-    public Unifier<HashMap<V, ArrayList<K>>> getUnifier()
-    {
-      return new InvertIndex<K, V>();
-    }
-  };
-
-  /**
-   *
-   * Returns the ArrayList stored for a key
-   *
-   * @param key
-   */
-  void insert(V val, K key)
-  {
-    ArrayList<K> list = map.get(val);
-    if (list == null) {
-      list = new ArrayList<K>(4);
-      map.put(cloneValue(val), list);
-    }
-    list.add(key);
-  }
-
-  /**
-   * Emit all the data and clear the hash
-   */
-  @Override
-  public void endWindow()
-  {
-    for (Map.Entry<V, ArrayList<K>> e: map.entrySet()) {
-      HashMap<V, ArrayList<K>> tuple = new HashMap<V, ArrayList<K>>(1);
-      tuple.put(e.getKey(), e.getValue());
-      index.emit(tuple);
-    }
-    map.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/LastMatchMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/LastMatchMap.java b/library/src/main/java/com/datatorrent/lib/algo/LastMatchMap.java
deleted file mode 100644
index 3235105..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/LastMatchMap.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.BaseMatchOperator;
-
-/**
- * This operator filters the incoming stream of key value pairs by obtaining the values corresponding to a specified key,
- * and comparing those values to a specified value.&nbsp;The last key value pair, in each window, to satisfy the comparison is emitted.
- * <p>
- * A compare function is  operated on a tuple value sub-classed from Number based on the property "key", "value", and "cmp". Every tuple
- * is checked and the last one that passes the condition is send during end of window on port "last". The comparison is done by getting double
- * value from the Number.
- * </p>
- * <p>
- * This module is an end of window module<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br>
- * <b>Partitions : No, </b> will yield wrong result. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>last</b>: emits Map&lt;K,V&gt; in end of window for the last tuple on which the compare function is true<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Specific compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
- * <br>
- * </p>
- *
- * @displayName Emit Last Match (Number)
- * @category Rules and Alerts
- * @tags filter, key value, numeric
- *
- * @since 0.3.2
- */
-@OperatorAnnotation(partitionable = false)
-public class LastMatchMap<K, V extends Number> extends BaseMatchOperator<K,V>
-{
-  /**
-   * Last tuple.
-   */
-  protected HashMap<K, V> ltuple = null;
-
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Processes tuples and keeps a copy of last matched tuple
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      V val = tuple.get(getKey());
-      if (val == null) {
-        return;
-      }
-      if (compareValue(val.doubleValue())) {
-        ltuple = cloneTuple(tuple);
-      }
-    }
-  };
-
-  /**
-   * The output port on which the last key value pair to satisfy the comparison function is emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> last = new DefaultOutputPort<HashMap<K, V>>();
-
-  /**
-   * Emits last matching tuple
-   */
-  @Override
-  public void endWindow()
-  {
-    if (ltuple != null) {
-      last.emit(ltuple);
-    }
-    ltuple = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyMap.java b/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyMap.java
deleted file mode 100644
index 2996b5a..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyMap.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
-import com.datatorrent.lib.util.AbstractBaseFrequentKey;
-import com.datatorrent.lib.util.UnifierArrayHashMapFrequent;
-import com.datatorrent.lib.util.UnifierHashMapFrequent;
-
-/**
- * This operator filters the incoming stream of key value pairs by finding the key or keys (if there is a tie) that occur the fewest number of times within each window.&nbsp;
- * A list of the corresponding key value pairs are then output to the port named "list" and one of the corresponding key value pairs is output to the port "least", at the end of each window.
- * <p>
- * Occurrences of each key is counted and at the end of window any of the least frequent key is emitted on output port least and all least frequent
- * keys on output port list.
- * </p>
- * <p>
- * This module is an end of window module. In case of a tie any of the least key would be emitted. The list port would however have all the tied keys<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are compared across application window(s). <br>
- * <b>Partitions : Yes, </b> least keys are unified on output port. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V&gt;, V is ignored/not used<br>
- * <b>least</b>: emits HashMap&lt;K,Integer&gt;(1); where String is the least frequent key, and Integer is the number of its occurrences in the window<br>
- * <b>list</b>: emits ArrayList&lt;HashMap&lt;K,Integer&gt;(1)&gt;; Where the list includes all the keys are least frequent<br>
- * <br>
- * </p>
- *
- * @displayName Emit Least Frequent Tuple Key
- * @category Rules and Alerts
- * @tags filter, key value, count
- *
- * @since 0.3.2
- */
-
-@OperatorAnnotation(partitionable = true)
-public class LeastFrequentKeyMap<K, V> extends AbstractBaseFrequentKey<K>
-{
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Calls super.processTuple(tuple) for each key in the HashMap
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      for (Map.Entry<K, V> e: tuple.entrySet()) {
-        processTuple(e.getKey());
-      }
-    }
-  };
-
-  /**
-   * The output port on which one of the tuples,
-   * which occurred the least number of times,
-   * is emitted.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<HashMap<K, Integer>> least = new DefaultOutputPort<HashMap<K, Integer>>()
-  {
-    @Override
-    public Unifier<HashMap<K, Integer>> getUnifier()
-    {
-      Unifier<HashMap<K, Integer>> ret = new UnifierHashMapFrequent<K>();
-      ((UnifierHashMapFrequent<K>)ret).setLeast(true);
-      return ret;
-    }
-  };
-
-  /**
-   * The output port on which all the tuples,
-   * which occurred the least number of times,
-   * is emitted.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ArrayList<HashMap<K, Integer>>> list = new DefaultOutputPort<ArrayList<HashMap<K, Integer>>>()
-  {
-    @Override
-    public Unifier<ArrayList<HashMap<K, Integer>>> getUnifier()
-    {
-      Unifier<ArrayList<HashMap<K, Integer>>> ret = new UnifierArrayHashMapFrequent<K>();
-      ((UnifierArrayHashMapFrequent<K>)ret).setLeast(true);
-      return ret;
-    }
-  };
-
-  /**
-   * Emits tuple on port "least"
-   *
-   * @param tuple
-   */
-  @Override
-  public void emitTuple(HashMap<K, Integer> tuple)
-  {
-    least.emit(tuple);
-  }
-
-  /**
-   * Emits tuple on port "list"
-   *
-   * @param tlist
-   */
-  @Override
-  public void emitList(ArrayList<HashMap<K, Integer>> tlist)
-  {
-    list.emit(tlist);
-  }
-
-  /**
-   * returns val1 < val2
-   *
-   * @param val1
-   * @param val2
-   * @return val1 < val2
-   */
-  @Override
-  public boolean compareCount(int val1, int val2)
-  {
-    return val1 < val2;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMap.java b/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMap.java
deleted file mode 100644
index a94121d..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMap.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-
-import com.datatorrent.api.DefaultOutputPort;
-
-import com.datatorrent.lib.util.AbstractBaseFrequentKeyValueMap;
-
-/**
- * This operator filters the incoming stream of key value pairs by finding the value or values (if there is a tie),
- * for each key, that occur the fewest number of times within each window.&nbsp;
- * Each key and its corresponding least values are emitted at the end of each window.
- * <p>
- * Occurrences of all values for each key is counted and at the end of window the least frequent values are emitted on output port least per key.
- * </p>
- * <p>
- * This module is an end of window module<br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V&gt;<br>
- * <b>least</b>: Output port, emits HashMap&lt;K,HashMap&lt;V,Integer&gt;&gt;(1)<br>
- * <br>
- * <b>Properties</b>: None<br>
- * <br>
- * <b>Compile time checks</b>: None<br>
- * <b>Specific run time checks</b>: None <br>
- * <br>
- * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
- * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for LeastFrequentKeyValueMap&lt;K,V&gt; operator template">
- * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
- * <tr><td><b>&gt; 30 Million K,V pairs/s</b></td><td>Emits only 1 tuple per window per key</td><td>In-bound throughput is the main determinant of performance.
- * The benchmark was done with immutable objects. If K or V are mutable the benchmark may be lower</td></tr>
- * </table><br>
- * </p>
- * <p>
- * <b>Function Table (K=String,V=Integer);</b>:
- * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for LeastFrequentKeyValueMap&lt;K,V&gt; operator template">
- * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr>
- * <tr><th><i>data</i>(Map&lt;K,V&gt;)</th><th><i>least</i>(HashMap&lt;K,HashMap&lt;Integer&gt;&gt;)</th></tr>
- * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr>
- * <tr><td>Data (process())</td><td>{a=1,b=5,c=110}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=55,c=2000,b=45}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=2}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=55,b=5,c=22}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{h=20,a=2,z=5}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=4,c=110}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=4,z=5}</td><td></td></tr>
- * <tr><td>End Window (endWindow())</td><td>N/A</td><td>{a={1=1,2=1},b={45=1},c={2000=1,22=1},d={2=1},h={20=1},z={5=2}</td></tr>
- * </table>
- * <br>
- * <br>
- * </p>
- *
- * @displayName Emit Least Frequent Keyval Pair
- * @category Rules and Alerts
- * @tags filter, key value, count
- *
- * @since 0.3.2
- */
-public class LeastFrequentKeyValueMap<K, V> extends AbstractBaseFrequentKeyValueMap<K, V>
-{
-  /**
-   * The output port on which the least frequent key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, HashMap<V, Integer>>> least = new DefaultOutputPort<HashMap<K, HashMap<V, Integer>>>();
-
-  /**
-   * returns val1 < val2
-   * @param val1
-   * @param val2
-   * @return val1 < val2
-   */
-  @Override
-  public boolean compareValue(int val1, int val2)
-  {
-    return (val1 < val2);
-  }
-
-  /**
-   * Emits tuple on port "least"
-   * @param tuple
-   */
-  @Override
-  public void emitTuple(HashMap<K, HashMap<V, Integer>> tuple)
-  {
-    least.emit(tuple);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/MatchAllMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/MatchAllMap.java b/library/src/main/java/com/datatorrent/lib/algo/MatchAllMap.java
index f0c5d0e..0d258f6 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/MatchAllMap.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/MatchAllMap.java
@@ -64,8 +64,9 @@ import com.datatorrent.lib.util.UnifierBooleanAnd;
  * @tags filter, key value
  *
  * @since 0.3.2
+ * @deprecated
  */
-
+@Deprecated
 @OperatorAnnotation(partitionable = true)
 public class MatchAllMap<K, V extends Number> extends BaseMatchOperator<K, V>
 {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/MatchAnyMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/MatchAnyMap.java b/library/src/main/java/com/datatorrent/lib/algo/MatchAnyMap.java
index 41a92ed..f3ed5ae 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/MatchAnyMap.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/MatchAnyMap.java
@@ -66,8 +66,9 @@ import com.datatorrent.lib.util.UnifierBooleanOr;
  * @tags filter, key value
  *
  * @since 0.3.2
+ * @deprecated
  */
-
+@Deprecated
 @OperatorAnnotation(partitionable = true)
 public class MatchAnyMap<K, V extends Number> extends BaseMatchOperator<K,V>
 {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/MatchMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/MatchMap.java b/library/src/main/java/com/datatorrent/lib/algo/MatchMap.java
index e84c88b..c9b171c 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/MatchMap.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/MatchMap.java
@@ -65,7 +65,9 @@ import com.datatorrent.lib.util.UnifierHashMap;
  * @tags filter, key value, numeric
  *
  * @since 0.3.2
+ * @deprecated
  */
+@Deprecated
 @Stateless
 @OperatorAnnotation(partitionable = true)
 public class MatchMap<K,V extends Number> extends BaseMatchOperator<K, V>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/MergeSort.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/MergeSort.java b/library/src/main/java/com/datatorrent/lib/algo/MergeSort.java
index 9ccf76c..a3f8a0e 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/MergeSort.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/MergeSort.java
@@ -57,7 +57,9 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
  * </p>
  *
  * @since 0.3.3
+ * @deprecated
  */
+@Deprecated
 @OperatorAnnotation(partitionable = true)
 public abstract class MergeSort<K>  implements Operator, Unifier<ArrayList<K>>
 {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/MergeSortNumber.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/MergeSortNumber.java b/library/src/main/java/com/datatorrent/lib/algo/MergeSortNumber.java
index e9d0eff..5839797 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/MergeSortNumber.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/MergeSortNumber.java
@@ -49,7 +49,9 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
  * @tags rank, numeric
  *
  * @since 0.3.3
+ * @deprecated
  */
+@Deprecated
 @OperatorAnnotation(partitionable = true)
 public class MergeSortNumber<V extends Number> extends MergeSort<V>
 {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyMap.java b/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyMap.java
deleted file mode 100644
index c300c24..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyMap.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.AbstractBaseFrequentKey;
-import com.datatorrent.lib.util.UnifierArrayHashMapFrequent;
-import com.datatorrent.lib.util.UnifierHashMapFrequent;
-
-/**
- * This operator filters the incoming stream of key value pairs by finding the key or keys (if there is a tie)
- * that occur the largest number of times within each window.&nbsp;
- * A list of the corresponding key value pairs are then output to the port named "list" and one of the corresponding key value pairs is output to the port "most", at the end of each window.
- * <p>
- * Occurrences of each key is counted and at the end of window any of the most frequent key is emitted on output port least and all least frequent
- * keys on output port list.
- * </p>
- * <p>
- * This module is an end of window module. In case of a tie any of the least key would be emitted. The list port would however have all the tied keys<br>
- * <br>
- *  <b>StateFull : Yes</b>, Values are compared all over  application window can be > 1. <br>
- *  <b>Partitions : Yes</b>, Result is unified on output port. <br>
- *  <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V&gt;, V is ignored/not used<br>
- * <b>most</b>: emits HashMap&lt;K,Integer&gt;(1); where String is the least frequent key, and Integer is the number of its occurrences in the window<br>
- * <b>list</b>: emits ArrayList&lt;HashMap&lt;K,Integer&gt;(1)&gt;; Where the list includes all the keys are least frequent<br>
- * <br>
- * </p>
- *
- * @displayName Emit Most Frequent Key
- * @category Rules and Alerts
- * @tags filter, key value, count
- *
- * @since 0.3.2
- */
-
-@OperatorAnnotation(partitionable = true)
-public class MostFrequentKeyMap<K,V> extends AbstractBaseFrequentKey<K>
-{
-  /**
-   * The input port which receives incoming key value pairs.
-   */
-  public final transient DefaultInputPort<Map<K,V>> data = new DefaultInputPort<Map<K,V>>()
-  {
-    /**
-     * Calls super.processTuple(tuple) for each key in the HashMap
-     */
-    @Override
-    public void process(Map<K,V> tuple)
-    {
-      for (Map.Entry<K, V> e: tuple.entrySet()) {
-        processTuple(e.getKey());
-      }
-    }
-  };
-  /**
-   * The output port on which all the tuples,
-   * which occurred the most number of times,
-   * is emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, Integer>> most = new DefaultOutputPort<HashMap<K, Integer>>()
-  {
-    @Override
-    public Unifier<HashMap<K, Integer>> getUnifier()
-    {
-      Unifier<HashMap<K, Integer>> ret = new UnifierHashMapFrequent<K>();
-      ((UnifierHashMapFrequent<K>)ret).setLeast(false);
-      return ret;
-    }
-  };
-
-
-  public final transient DefaultOutputPort<ArrayList<HashMap<K, Integer>>> list = new DefaultOutputPort<ArrayList<HashMap<K, Integer>>>()
-  {
-    @SuppressWarnings({"rawtypes", "ConstantConditions"})
-    @Override
-    public Unifier<ArrayList<HashMap<K, Integer>>> getUnifier()
-    {
-      Unifier<ArrayList<HashMap<K, Integer>>> ret = new UnifierArrayHashMapFrequent<K>();
-      ((UnifierHashMapFrequent)ret).setLeast(false);
-      return ret;
-    }
-  };
-
-
-  /**
-   * Emits tuple on port "most"
-   * @param tuple
-   */
-  @Override
-  public void emitTuple(HashMap<K, Integer> tuple)
-  {
-    most.emit(tuple);
-  }
-
-  /**
-   * Emits tuple on port "list"
-   * @param tlist
-   */
-  @Override
-  public void emitList(ArrayList<HashMap<K, Integer>> tlist)
-  {
-    list.emit(tlist);
-  }
-
-  /**
-   * returns val1 < val2
-   * @param val1
-   * @param val2
-   * @return val1 > val2
-   */
-  @Override
-  public boolean compareCount(int val1, int val2)
-  {
-    return val1 > val2;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyValueMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyValueMap.java b/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyValueMap.java
deleted file mode 100644
index 40a4372..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/MostFrequentKeyValueMap.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.AbstractBaseFrequentKeyValueMap;
-
-/**
- * This operator filters the incoming stream of key value pairs by finding the value or values (if there is a tie),
- * for each key, that occur the largest number of times within each window.&nbsp;
- * Each key and its corresponding most values are emitted at the end of each window.
- * <p>
- * Occurrences of all values for each key is counted and at the end of window the most frequent values are emitted on output port least per key
- * </p>
- * <p>
- * This module is an end of window module<br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects HashMap&lt;K,V&gt;<br>
- * <b>most</b>: emits HashMap&lt;String, HashMap&lt;String, Integer&gt;&gt;(1)<br>
- * <br>
- * <br>
- * <b>Properties</b>: None<br>
- * <br>
- * <b>Compile time checks</b>: None<br>
- * <b>Specific run time checks</b>: None <br>
- * <br>
- * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
- * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for MostFrequentKeyValueMap&lt;K,V&gt; operator template">
- * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
- * <tr><td><b>&gt; 30 Million K,V pairs/s</b></td><td>Emits only 1 tuple per window per key</td><td>In-bound throughput is the main determinant of performance.
- * The benchmark was done with immutable objects. If K or V are mutable the benchmark may be lower</td></tr>
- * </table><br>
- * </p>
- * <p>
- * <b>Function Table (K=String,V=Integer);</b>:
- * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for MostFrequentKeyValueMap&lt;K,V&gt; operator template">
- * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr>
- * <tr><th><i>data</i>(HashMap&lt;K,V&gt;)</th><th><i>most</i>(HashMap&lt;K,HashMap&lt;Integer&gt;&gt;)</th></tr>
- * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr>
- * <tr><td>Data (process())</td><td>{a=1,b=5,c=110}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=55,c=2000,b=45}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=2}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=55,b=5,c=22}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{h=20,a=2,z=5}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=4,c=110}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=4,z=5}</td><td></td></tr>
- * <tr><td>End Window (endWindow())</td><td>N/A</td><td>{a={4=2,55=2},b={5=2},c={110=2},d={2=1},h={20=1},z={5=2}</td></tr>
- * </table>
- * <br>
- * <br>
- * </p>
- *
- * @displayName Emit Most Frequent Keyval Pair
- * @category Rules and Alerts
- * @tags filter, key value, count
- *
- * @since 0.3.2
- */
-
-@OperatorAnnotation(partitionable = false)
-public class MostFrequentKeyValueMap<K, V> extends AbstractBaseFrequentKeyValueMap<K, V>
-{
-  /**
-   * The output port which emits a map from keys to their most values.
-   */
-  public final transient DefaultOutputPort<HashMap<K, HashMap<V, Integer>>> most = new DefaultOutputPort<HashMap<K, HashMap<V, Integer>>>();
-
-  /**
-   * returns val1 < val2
-   * @param val1
-   * @param val2
-   * @return val1 > val2
-   */
-  @Override
-  public boolean compareValue(int val1, int val2)
-  {
-    return (val1 > val2);
-  }
-
-  /**
-   * Emits tuple on port "most"
-   * @param tuple is emitted on port "most"
-   */
-  @Override
-  public void emitTuple(HashMap<K, HashMap<V, Integer>> tuple)
-  {
-    most.emit(tuple);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/Sampler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/Sampler.java b/library/src/main/java/com/datatorrent/lib/algo/Sampler.java
deleted file mode 100644
index b087c9e..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/Sampler.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.Random;
-
-import javax.validation.constraints.Max;
-import javax.validation.constraints.Min;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.api.annotation.Stateless;
-
-import com.datatorrent.lib.util.BaseKeyOperator;
-
-/**
- * This operator takes a stream of tuples as input, and emits each tuple with a specified probability.
- * <p>
- * Emits the tuple as per probability of pass rate out of total rate. <br>
- * <br>
- * An efficient filter to allow sample analysis of a stream. Very useful is the incoming stream has high throughput.
- * </p>
- * <p>
- * <br>
- * <b> StateFull : No, </b> tuple is processed in current window. <br>
- * <b> Partitions : Yes. </b> No state dependency among input tuples. <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects K<br>
- * <b>sample</b>: emits K<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>passrate</b>: Sample rate out of a total of totalrate. Default is 1<br>
- * <b>totalrate</b>: Total rate (divisor). Default is 100<br>
- * <br>
- * <b>Specific compile time checks are</b>: None<br>
- * passrate is positive integer<br>
- * totalrate is positive integer<br>
- * passrate and totalrate are not compared (i.e. passrate &lt; totalrate) check is not done to allow users to make this operator a passthrough (all) during testing<br>
- * <br>
- * <b>Specific run time checks are</b>: None<br>
- * <br>
- * </p>
- *
- * @displayName Sampler
- * @category Stats and Aggregations
- * @tags filter
- *
- * @since 0.3.2
- */
-@Stateless
-@OperatorAnnotation(partitionable = true)
-public class Sampler<K> extends BaseKeyOperator<K>
-{
-  /**
-   * This is the input port which receives tuples.
-   */
-  public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
-  {
-    /**
-     * Emits tuples at a rate corresponding to the given samplingPercentage.
-     */
-    @Override
-    public void process(K tuple)
-    {
-      double val = random.nextDouble();
-      if (val > samplingPercentage) {
-        return;
-      }
-      sample.emit(cloneKey(tuple));
-    }
-  };
-
-  /**
-   * This is the output port which emits the sampled tuples.
-   */
-  public final transient DefaultOutputPort<K> sample = new DefaultOutputPort<K>();
-
-  @Min(0)
-  @Max(1)
-  private double samplingPercentage = 1.0;
-
-  private transient Random random = new Random();
-
-  /**
-   * Gets the samplingPercentage.
-   * @return the samplingPercentage
-   */
-  public double getSamplingPercentage()
-  {
-    return samplingPercentage;
-  }
-
-  /**
-   * The percentage of tuples to allow to pass through this operator. This percentage should be
-   * a number between 0 and 1 inclusive.
-   * @param samplingPercentage the samplingPercentage to set
-   */
-  public void setSamplingPercentage(double samplingPercentage)
-  {
-    this.samplingPercentage = samplingPercentage;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/TopNUnique.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/TopNUnique.java b/library/src/main/java/com/datatorrent/lib/algo/TopNUnique.java
index 3c6a9f5..d80f1c1 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/TopNUnique.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/TopNUnique.java
@@ -54,8 +54,9 @@ import com.datatorrent.lib.util.AbstractBaseNUniqueOperatorMap;
  * @tags filter, rank
  *
  * @since 0.3.2
+ * @deprecated
  */
-
+@Deprecated
 @OperatorAnnotation(partitionable = false)
 public class TopNUnique<K, V> extends AbstractBaseNUniqueOperatorMap<K, V>
 {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
index 382a0d6..1eb7957 100644
--- a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
@@ -32,7 +33,6 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.lib.streamquery.condition.Condition;
 import com.datatorrent.lib.streamquery.index.Index;
 
-
 /**
  * An implementation of Operator that reads table row data from two table data input ports. <br>
  * <p>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Average.java b/library/src/main/java/com/datatorrent/lib/math/Average.java
index 4dfdf1f..ff7a9d7 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Average.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Average.java
@@ -20,14 +20,14 @@ package com.datatorrent.lib.math;
 
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.Pair;
 import com.datatorrent.lib.util.BaseNumberValueOperator;
 
 /**
  *
  * Emits the average of values at the end of window.
  * <p>
- * This is an end window operator. This can not be partitioned. Partitioning
- * this will yield incorrect result.<br>
+ * This is an end window operator. <br>
  * <b>Ports</b>:<br>
  * <b>data</b>: expects V extends Number<br>
  * <b>average</b>: emits V extends Number<br>
@@ -63,7 +63,14 @@ public class Average<V extends Number> extends BaseNumberValueOperator<V>
   /**
    * Output port that emits average as a number.
    */
-  public final transient DefaultOutputPort<V> average = new DefaultOutputPort<V>();
+  public final transient DefaultOutputPort<Pair<V,Long>> average = new DefaultOutputPort<Pair<V, Long>>()
+  {
+    @Override
+    public Unifier<Pair<V, Long>> getUnifier()
+    {
+      return new AvgUnifier<V>();
+    }
+  };
 
   protected double sums = 0;
   protected long counts = 0;
@@ -75,13 +82,27 @@ public class Average<V extends Number> extends BaseNumberValueOperator<V>
   public void endWindow()
   {
     // May want to send out only if count != 0
+
     if (counts != 0) {
-      average.emit(getAverage());
+      Pair<V,Long> pair = new Pair<>(getAverage(),counts);
+      average.emit(pair);
     }
+
     sums = 0;
     counts = 0;
   }
 
+  public static class AvgUnifier<V extends Number> extends Average<V> implements Unifier<Pair<V, Long>>
+  {
+
+    @Override
+    public void process(Pair<V, Long> pair)
+    {
+      sums += pair.getFirst().doubleValue() * pair.getSecond();
+      counts += pair.getSecond();
+    }
+  }
+
   /**
    * Calculate average based on number type.
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/Change.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Change.java b/library/src/main/java/com/datatorrent/lib/math/Change.java
deleted file mode 100644
index 57bad6b..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/Change.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.BaseNumberValueOperator;
-
-/**
- * Operator compares data values arriving on input port with base value input operator.
- * 
- * <p>
- * Arriving base value is stored in operator for comparison, old base value is overwritten.&nbsp;
- * This emits &lt;change in value,percentage change&gt;.
- * Operator expects values arriving on data input port and base value input operator.
- * Change in value and percentage change in values are emitted on separate ports.<br>
- * This operator can not be partitioned, since copies won't get consecutive operators. <br>
- * This is StateFull operator, tuples that arrive on base port are kept in
- * cache forever.<br>
- * <br>
- * <b>Input Ports</b>:<br>
- * <b>data</b>: expects V extends Number, Data values<br>
- * <b>base</b>: expects V extends Number, Base Value stored for comparison<br>
- *
- * <b>Output Ports</b>:<br>
- * <b>change</b>: emits V extends Number,  Diff from base value<br>
- * <b>percent</b>: emits Doubl, percent change in value compared to base value.<br>
- * <br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * <br>
- * <b>Specific compile time checks</b>: None<br>
- * <b>Specific run time checks</b>: None<br>
- * <br>
- *
- * <br>
- * @displayName Change
- * @category Math
- * @tags change, key value, numeric, percentage
- * @since 0.3.3
- */
-public class Change<V extends Number> extends BaseNumberValueOperator<V>
-{
-        /**
-   * Input data port that takes a number.
-   */
-  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-  {
-    /**
-     * Process each key, compute change or percent, and emit it.
-     */
-    @Override
-    public void process(V tuple)
-    {
-      if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple?
-        double cval = tuple.doubleValue() - baseValue;
-        change.emit(getValue(cval));
-        percent.emit((cval / baseValue) * 100);
-      }
-    }
-  };
-        
-        /**
-   * Input port that takes a number&nbsp; It stores the value for base comparison.
-   */
-  public final transient DefaultInputPort<V> base = new DefaultInputPort<V>()
-  {
-    /**
-     * Process each key to store the value. If same key appears again update
-     * with latest value.
-     */
-    @Override
-    public void process(V tuple)
-    {
-      if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error
-                                        // tuple?
-        baseValue = tuple.doubleValue();
-      }
-    }
-  };
-
-  /**
-   * Output port that emits change in value compared to base value.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>();
-
-  /**
-   * Output port that emits percent change in data value compared to base value.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>();
-
-  /**
-   * baseValue is a state full field. It is retained across windows.
-   */
-  private double baseValue = 0;
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
deleted file mode 100644
index 3c48016..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import javax.validation.constraints.Min;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.BaseNumberValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Compares consecutive input data values, emits &lt;value,percent change value&gt; pair on alert output port, if percent change exceeds certain thresh hold value.
- * <p>
- * Operator is StateFull since current value is stored for comparison in next window. <br>
- * This operator can not be partitioned, partitioning will result in inconsistent base value
- * across replicated copies.
- * <br>
- *
- * <b>Ports</b>:<br>
- * <b>data</b>: expects KeyValPair&lt;K,V extends Number&gt;<br>
- * <b>alert</b>: emits KeyValPair&lt;K,KeyValPair&lt;V,Double&gt;&gt;(1)<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>threshold</b>: The threshold of change between consecutive tuples of the
- * same key that triggers an alert tuple<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * <br>
- * <b>Specific compile time checks</b>: None<br>
- * <b>Specific run time checks</b>: None<br>
- * <br>
- * @displayName Change Alert
- * @category Rules and Alerts
- * @tags change, key value, numeric, percentage
- * @since 0.3.3
- */
-public class ChangeAlert<V extends Number> extends BaseNumberValueOperator<V>
-{
-  /**
-   * Input port that takes in a number.
-   */
-  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-  {
-    /**
-     * Process each key, compute change or percent, and emit it. If we get 0 as
-     * tuple next will be skipped.
-     */
-    @Override
-    public void process(V tuple)
-    {
-      double tval = tuple.doubleValue();
-      if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple?
-        baseValue = tval;
-        return;
-      }
-      double change = tval - baseValue;
-      double percent = (change / baseValue) * 100;
-      if (percent < 0.0) {
-        percent = 0.0 - percent;
-      }
-      if (percent > percentThreshold) {
-        KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple),
-            percent);
-        alert.emit(kv);
-      }
-      baseValue = tval;
-    }
-  };
-
-
-  /**
-   * Output port which emits a key value pair.
-   */
-  public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>();
-
-  /**
-   * baseValue is a state full field. It is retained across windows
-   */
-  private double baseValue = 0;
-  @Min(1)
-  private double percentThreshold = 0.0;
-
-  /**
-   * getter function for threshold value
-   *
-   * @return threshold value
-   */
-  @Min(1)
-  public double getPercentThreshold()
-  {
-    return percentThreshold;
-  }
-
-  /**
-   * setter function for threshold value
-   */
-  public void setPercentThreshold(double d)
-  {
-    percentThreshold = d;
-  }
-}