You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:47 UTC

[03/12] apex-malhar git commit: Updated algo & working on math operators

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
deleted file mode 100644
index 45e90ec..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import javax.validation.constraints.NotNull;
-
-
-/**
- * A base implementation of an index that filters row by unary expression.  Subclasses should provide the
-   implementation of filter/getExpressionName functions.
- * <p>
- * Sub class will implement filter/getExpressionName functions.
- * @displayName Unary Expression
- * @category Stream Manipulators
- * @tags unary, alias
- * @since 0.3.4
- */
-public abstract class UnaryExpression  implements Index
-{
-  /**
-   * Column name argument for unary expression.
-   */
-  @NotNull
-  protected String column;
-
-  /**
-   *  Alias name for output field.
-   */
-  protected String alias;
-
-  /**
-   * @param column name argument for unary expression.
-   * @param alias name for output field.
-   */
-  public UnaryExpression(@NotNull String column, String alias)
-  {
-    this.column = column;
-  }
-
-  public String getColumn()
-  {
-    return column;
-  }
-
-  public void setColumn(String column)
-  {
-    this.column = column;
-  }
-
-  public String getAlias()
-  {
-    return alias;
-  }
-
-  public void setAlias(String alias)
-  {
-    this.alias = alias;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/package-info.java b/library/src/main/java/com/datatorrent/lib/streamquery/package-info.java
deleted file mode 100644
index 489915f..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Library of operators for streaming query language.
- */
-@org.apache.hadoop.classification.InterfaceStability.Evolving
-package com.datatorrent.lib.streamquery;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcherTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcherTest.java b/library/src/test/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcherTest.java
deleted file mode 100644
index a3d3019..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcherTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.AbstractStreamPatternMatcher}<p>
- *
- */
-
-import java.util.Random;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-
-
-public class AbstractStreamPatternMatcherTest
-{
-
-  public static class StreamPatternMatcher<T> extends AbstractStreamPatternMatcher<T>
-  {
-    @Override
-    public void processPatternFound()
-    {
-      outputPort.emit(getPattern().getStates());
-    }
-
-    public transient DefaultOutputPort<T[]> outputPort = new DefaultOutputPort<T[]>();
-  }
-
-  private StreamPatternMatcher<Integer> streamPatternMatcher;
-  private AbstractStreamPatternMatcher.Pattern<Integer> pattern;
-  private Integer[] inputPattern;
-  private CollectorTestSink<Object> sink;
-
-  @Before
-  public void setup()
-  {
-    streamPatternMatcher = new StreamPatternMatcher<Integer>();
-    sink = new CollectorTestSink<Object>();
-    streamPatternMatcher.outputPort.setSink(sink);
-  }
-
-  @After
-  public void cleanup()
-  {
-    streamPatternMatcher.teardown();
-    sink.collectedTuples.clear();
-  }
-
-  @Test
-  public void test() throws Exception
-  {
-    inputPattern = new Integer[]{0, 1, 0, 1, 2};
-    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
-    streamPatternMatcher.setPattern(pattern);
-    streamPatternMatcher.setup(null);
-    streamPatternMatcher.beginWindow(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(1);
-    streamPatternMatcher.inputPort.process(1);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(1);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(1);
-    streamPatternMatcher.inputPort.process(2);
-    streamPatternMatcher.inputPort.process(1);
-    streamPatternMatcher.endWindow();
-    Assert.assertEquals("The number of tuples emitted is one", 1, sink.collectedTuples.size());
-    Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, sink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testSimplePattern() throws Exception
-  {
-    inputPattern = new Integer[]{0, 0};
-    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
-    streamPatternMatcher.setPattern(pattern);
-    streamPatternMatcher.setup(null);
-    streamPatternMatcher.beginWindow(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(1);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.endWindow();
-    Assert.assertEquals("The number of tuples emitted are three", 3, sink.collectedTuples.size());
-    for (Object object : sink.collectedTuples) {
-      Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, object);
-    }
-  }
-
-  @Test
-  public void testPatternWithSingleState() throws Exception
-  {
-    inputPattern = new Integer[]{0};
-    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
-    streamPatternMatcher.setPattern(pattern);
-    streamPatternMatcher.setup(null);
-    streamPatternMatcher.beginWindow(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(1);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.inputPort.process(0);
-    streamPatternMatcher.endWindow();
-    Assert.assertEquals("The number of tuples emitted are three", 5, sink.collectedTuples.size());
-    for (Object object : sink.collectedTuples) {
-      Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, object);
-    }
-  }
-
-  @Test
-  public void testAutoGeneratedPattern() throws Exception
-  {
-    Random random = new Random();
-    int patternSize = 15;
-    inputPattern = new Integer[patternSize];
-    int max = 10;
-    int min = 1;
-    int primeNumber = 5;
-    for (int i = 0; i < patternSize; i++) {
-      inputPattern[i] = (min + random.nextInt(max));
-    }
-    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
-    streamPatternMatcher.setPattern(pattern);
-    streamPatternMatcher.setup(null);
-    streamPatternMatcher.beginWindow(0);
-    int numberOfIterations = 20;
-    for (int i = 0; i < patternSize; i++) {
-      for (int j = 0; j <= i; j++) {
-        streamPatternMatcher.inputPort.process(inputPattern[j]);
-      }
-      for (int k = 0; k < numberOfIterations; k++) {
-        streamPatternMatcher.inputPort.process(max + min + random.nextInt(max));
-      }
-      if (i % primeNumber == 0) {
-        for (int j = 0; j < patternSize; j++) {
-          streamPatternMatcher.inputPort.process(inputPattern[j]);
-        }
-      }
-    }
-    streamPatternMatcher.endWindow();
-    Assert.assertEquals("The number of tuples emitted ", 1 + patternSize / primeNumber, sink.collectedTuples.size());
-    for (Object output : sink.collectedTuples) {
-      Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, output);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/AllAfterMatchMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/AllAfterMatchMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/AllAfterMatchMapTest.java
deleted file mode 100644
index d3d69cf..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/AllAfterMatchMapTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.AllAfterMatchMapTest}
- * <p>
- */
-public class AllAfterMatchMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new AllAfterMatchMap<String, Integer>());
-    testNodeProcessingSchema(new AllAfterMatchMap<String, Double>());
-    testNodeProcessingSchema(new AllAfterMatchMap<String, Float>());
-    testNodeProcessingSchema(new AllAfterMatchMap<String, Short>());
-    testNodeProcessingSchema(new AllAfterMatchMap<String, Long>());
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(AllAfterMatchMap oper)
-  {
-    CollectorTestSink allSink = new CollectorTestSink();
-    oper.allafter.setSink(allSink);
-    oper.setKey("a");
-    oper.setValue(3.0);
-    oper.setTypeEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 3);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("b", 6);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("c", 9);
-    oper.data.process(input);
-
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 3,
-        allSink.collectedTuples.size());
-    for (Object o : allSink.collectedTuples) {
-      for (Map.Entry<String, Number> e : ((HashMap<String, Number>)o).entrySet()) {
-        if (e.getKey().equals("a")) {
-          Assert.assertEquals("emitted value for 'a' was ", new Double(3), new Double(e.getValue().doubleValue()));
-        } else if (e.getKey().equals("b")) {
-          Assert.assertEquals("emitted tuple for 'b' was ", new Double(6), new Double(e.getValue().doubleValue()));
-        } else if (e.getKey().equals("c")) {
-          Assert.assertEquals("emitted tuple for 'c' was ", new Double(9), new Double(e.getValue().doubleValue()));
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/CompareExceptCountMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/CompareExceptCountMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/CompareExceptCountMapTest.java
index bc4f4d8..c7c15ff 100644
--- a/library/src/test/java/com/datatorrent/lib/algo/CompareExceptCountMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/algo/CompareExceptCountMapTest.java
@@ -21,16 +21,16 @@ package com.datatorrent.lib.algo;
 import java.util.HashMap;
 
 import org.junit.Assert;
-
 import org.junit.Test;
 
 import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
 
 /**
- *
+ * @deprecated
  * Functional tests for {@link com.datatorrent.lib.algo.CompareExceptCountMap} <p>
- *
+ * (Deprecating inclass) Comment: CompareExceptCountMap is deprecated.
  */
+@Deprecated
 public class CompareExceptCountMapTest
 {
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/DistinctMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/DistinctMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/DistinctMapTest.java
deleted file mode 100644
index 249c39d..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/DistinctMapTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.DistinctMap}<p>
- *
- */
-public class DistinctMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    DistinctMap<String, Number> oper = new DistinctMap<String, Number>();
-
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.distinct.setSink(sortSink);
-
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-
-    input.put("a", 2);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 2);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 1000);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 5);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 2);
-    input.put("b", 33);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 33);
-    input.put("b", 34);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("b", 34);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("b", 6);
-    input.put("a", 2);
-    oper.data.process(input);
-    input.clear();
-    input.put("c", 9);
-    oper.data.process(input);
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 8, sortSink.collectedTuples.size());
-    int aval = 0;
-    int bval = 0;
-    int cval = 0;
-    for (Object o: sortSink.collectedTuples) {
-      for (Map.Entry<String, Integer> e: ((HashMap<String, Integer>)o).entrySet()) {
-        String key = e.getKey();
-        if (key.equals("a")) {
-          aval += e.getValue();
-        } else if (key.equals("b")) {
-          bval += e.getValue();
-        } else if (key.equals("c")) {
-          cval += e.getValue();
-        }
-      }
-    }
-    Assert.assertEquals("Total for key \"a\" ", 1040, aval);
-    Assert.assertEquals("Total for key \"a\" ", 73, bval);
-    Assert.assertEquals("Total for key \"a\" ", 9, cval);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/FilterKeyValsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/FilterKeyValsTest.java b/library/src/test/java/com/datatorrent/lib/algo/FilterKeyValsTest.java
deleted file mode 100644
index 7069533..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/FilterKeyValsTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.FilterKeyVals}<p>
- *
- */
-public class FilterKeyValsTest
-{
-  @SuppressWarnings("unchecked")
-  int getTotal(List<Object> list)
-  {
-    int ret = 0;
-    for (Object map: list) {
-      for (Map.Entry<String, Number> e: ((HashMap<String, Number>)map).entrySet()) {
-        ret += e.getValue().intValue();
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    FilterKeyVals<String,Number> oper = new FilterKeyVals<String,Number>();
-
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.filter.setSink(sortSink);
-    HashMap<String,Number> filter = new HashMap<String,Number>();
-    filter.put("b",2);
-    oper.setKeyVals(filter);
-    oper.clearKeys();
-
-    filter.clear();
-    filter.put("e", 200);
-    filter.put("f", 2);
-    filter.put("blah", 2);
-    oper.setKeyVals(filter);
-    filter.clear();
-    filter.put("a", 2);
-    oper.setKeyVals(filter);
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 2);
-    input.put("b", 5);
-    input.put("c", 7);
-    input.put("d", 42);
-    input.put("e", 202);
-    input.put("e", 200);
-    input.put("f", 2);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 204, getTotal(sortSink.collectedTuples));
-    sortSink.clear();
-
-    input.clear();
-    input.put("a", 5);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 0, sortSink.collectedTuples.size());
-    sortSink.clear();
-
-    input.clear();
-    input.put("a", 2);
-    input.put("b", 33);
-    input.put("f", 2);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples));
-    sortSink.clear();
-
-    input.clear();
-    input.put("b", 6);
-    input.put("a", 2);
-    input.put("j", 6);
-    input.put("e", 2);
-    input.put("dd", 6);
-    input.put("blah", 2);
-    input.put("another", 6);
-    input.put("notmakingit", 2);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples));
-    sortSink.clear();
-
-    input.clear();
-    input.put("c", 9);
-    oper.setInverse(true);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 9, getTotal(sortSink.collectedTuples));
-
-    oper.endWindow();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/FilterKeysHashMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/FilterKeysHashMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/FilterKeysHashMapTest.java
deleted file mode 100644
index 3ef30f8..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/FilterKeysHashMapTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.FilterKeysHashMap}<p>
- *
- */
-public class FilterKeysHashMapTest
-{
-  @SuppressWarnings("unchecked")
-  int getTotal(Object o)
-  {
-    HashMap<String, HashMap<String, Number>> map = (HashMap<String, HashMap<String, Number>>)o;
-    int ret = 0;
-    for (Map.Entry<String, HashMap<String, Number>> e: map.entrySet()) {
-      for (Map.Entry<String, Number> e2: e.getValue().entrySet()) {
-        ret += e2.getValue().intValue();
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    FilterKeysHashMap<String, Number> oper = new FilterKeysHashMap<String, Number>();
-
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.filter.setSink(sortSink);
-    oper.setKey("b");
-    oper.clearKeys();
-    String[] keys = new String[3];
-    keys[0] = "e";
-    keys[1] = "f";
-    keys[2] = "blah";
-    oper.setKey("a");
-    oper.setKeys(keys);
-
-    oper.beginWindow(0);
-    HashMap<String, HashMap<String, Number>> inputA = new HashMap<String, HashMap<String, Number>>();
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    HashMap<String, Number> input2 = new HashMap<String, Number>();
-
-    input.put("a", 2);
-    input.put("b", 5);
-    input.put("c", 7);
-    input.put("d", 42);
-    input.put("e", 200);
-    input.put("f", 2);
-    inputA.put("A", input);
-    oper.data.process(inputA);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 204, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    inputA.clear();
-    input.put("a", 5);
-    inputA.put("A", input);
-    oper.data.process(inputA);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 5, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    inputA.clear();
-    input.put("a", 2);
-    input.put("b", 33);
-    input.put("f", 2);
-    inputA.put("A", input);
-    oper.data.process(inputA);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    inputA.clear();
-    input.put("b", 6);
-    input.put("a", 2);
-    input.put("j", 6);
-    input.put("e", 2);
-    input.put("dd", 6);
-    input.put("blah", 2);
-    input.put("another", 6);
-    input.put("notmakingit", 2);
-    inputA.put("A", input);
-    oper.data.process(inputA);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 6, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    inputA.clear();
-    input.put("c", 9);
-    oper.setInverse(true);
-    inputA.put("A", input);
-    oper.data.process(inputA);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 9, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    input2.clear();
-    inputA.clear();
-    input.put("e", 2); // pass
-    input.put("c", 9);
-    input2.put("a", 5); // pass
-    input2.put("p", 8);
-    oper.setInverse(false);
-    inputA.put("A", input);
-    inputA.put("B", input2);
-    oper.data.process(inputA);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 7, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    oper.endWindow();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/FilterKeysMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/FilterKeysMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/FilterKeysMapTest.java
deleted file mode 100644
index f00652e..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/FilterKeysMapTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.FilterKeysMap}<p>
- *
- */
-public class FilterKeysMapTest
-{
-  @SuppressWarnings("unchecked")
-  int getTotal(Object o)
-  {
-    HashMap<String, Number> map = (HashMap<String, Number>)o;
-    int ret = 0;
-    for (Map.Entry<String, Number> e: map.entrySet()) {
-      ret += e.getValue().intValue();
-    }
-    return ret;
-  }
-
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    FilterKeysMap<String,Number> oper = new FilterKeysMap<String,Number>();
-
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.filter.setSink(sortSink);
-    oper.setKey("b");
-    oper.clearKeys();
-    String[] keys = new String[3];
-    keys[0] = "e";
-    keys[1] = "f";
-    keys[2] = "blah";
-    oper.setKey("a");
-    oper.setKeys(keys);
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-
-    input.put("a", 2);
-    input.put("b", 5);
-    input.put("c", 7);
-    input.put("d", 42);
-    input.put("e", 200);
-    input.put("f", 2);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 204, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    input.put("a", 5);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 5, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    input.put("a", 2);
-    input.put("b", 33);
-    input.put("f", 2);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    input.put("b", 6);
-    input.put("a", 2);
-    input.put("j", 6);
-    input.put("e", 2);
-    input.put("dd", 6);
-    input.put("blah", 2);
-    input.put("another", 6);
-    input.put("notmakingit", 2);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 6, getTotal(sortSink.collectedTuples.get(0)));
-    sortSink.clear();
-
-    input.clear();
-    input.put("c", 9);
-    oper.setInverse(true);
-    oper.data.process(input);
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("Total filtered value is ", 9, getTotal(sortSink.collectedTuples.get(0)));
-
-    oper.endWindow();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/FirstMatchMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/FirstMatchMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/FirstMatchMapTest.java
deleted file mode 100644
index 2ec53d8..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/FirstMatchMapTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.FirstMatchMap}<p>
- *
- */
-public class FirstMatchMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new FirstMatchMap<String, Integer>());
-    testNodeProcessingSchema(new FirstMatchMap<String, Double>());
-    testNodeProcessingSchema(new FirstMatchMap<String, Float>());
-    testNodeProcessingSchema(new FirstMatchMap<String, Short>());
-    testNodeProcessingSchema(new FirstMatchMap<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(FirstMatchMap oper)
-  {
-    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
-    oper.first.setSink(matchSink);
-    oper.setKey("a");
-    oper.setValue(3);
-    oper.setTypeEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 4);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.put("a", 3);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 2);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 4);
-    input.put("b", 21);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 4);
-    input.put("b", 20);
-    input.put("c", 5);
-    oper.data.process(input);
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    HashMap<String, Number> tuple = (HashMap<String, Number>)matchSink.tuple;
-    Number aval = tuple.get("a");
-    Assert.assertEquals("Value of a was ", 3, aval.intValue());
-    matchSink.clear();
-
-    oper.beginWindow(0);
-    input.clear();
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 5);
-    oper.data.process(input);
-    oper.endWindow();
-    // There should be no emit as all tuples do not match
-    Assert.assertEquals("number emitted tuples", 0, matchSink.count);
-    matchSink.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/FirstNTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/FirstNTest.java b/library/src/test/java/com/datatorrent/lib/algo/FirstNTest.java
deleted file mode 100644
index e118459..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/FirstNTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.FirstN}<p>
- */
-public class FirstNTest
-{
-  private static Logger log = LoggerFactory.getLogger(FirstNTest.class);
-
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new FirstN<String, Integer>());
-    testNodeProcessingSchema(new FirstN<String, Double>());
-    testNodeProcessingSchema(new FirstN<String, Float>());
-    testNodeProcessingSchema(new FirstN<String, Short>());
-    testNodeProcessingSchema(new FirstN<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(FirstN oper)
-  {
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.first.setSink(sortSink);
-    oper.setN(3);
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-
-    input.put("a", 2);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 20);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 1000);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 5);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 20);
-    input.put("b", 33);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", 33);
-    input.put("b", 34);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("b", 34);
-    input.put("a", 1001);
-    oper.data.process(input);
-
-    input.clear();
-    input.put("b", 6);
-    input.put("a", 1);
-    oper.data.process(input);
-    input.clear();
-    input.put("c", 9);
-    oper.data.process(input);
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 7, sortSink.collectedTuples.size());
-    int aval = 0;
-    int bval = 0;
-    int cval = 0;
-    for (Object o : sortSink.collectedTuples) {
-      for (Map.Entry<String, Number> e : ((HashMap<String, Number>)o).entrySet()) {
-        if (e.getKey().equals("a")) {
-          aval += e.getValue().intValue();
-        } else if (e.getKey().equals("b")) {
-          bval += e.getValue().intValue();
-        } else if (e.getKey().equals("c")) {
-          cval += e.getValue().intValue();
-        }
-      }
-    }
-    Assert.assertEquals("Value of \"a\" was ", 1022, aval);
-    Assert.assertEquals("Value of \"a\" was ", 101, bval);
-    Assert.assertEquals("Value of \"a\" was ", 9, cval);
-    log.debug("Done testing round\n");
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/FirstTillMatchTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/FirstTillMatchTest.java b/library/src/test/java/com/datatorrent/lib/algo/FirstTillMatchTest.java
deleted file mode 100644
index 8de42d1..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/FirstTillMatchTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.FirstTillMatch}<p>
- *
- */
-public class FirstTillMatchTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new FirstTillMatch<String, Integer>());
-    testNodeProcessingSchema(new FirstTillMatch<String, Double>());
-    testNodeProcessingSchema(new FirstTillMatch<String, Float>());
-    testNodeProcessingSchema(new FirstTillMatch<String, Short>());
-    testNodeProcessingSchema(new FirstTillMatch<String, Long>());
-  }
-
-  @SuppressWarnings( {"unchecked", "rawtypes"})
-  public void testNodeProcessingSchema(FirstTillMatch oper)
-  {
-    CollectorTestSink matchSink = new CollectorTestSink();
-    oper.first.setSink(matchSink);
-    oper.setKey("a");
-    oper.setValue(3);
-    oper.setTypeEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 4);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 2);
-    oper.data.process(input);
-    input.put("a", 3);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 4);
-    input.put("b", 21);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 6);
-    input.put("b", 20);
-    input.put("c", 5);
-    oper.data.process(input);
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 2, matchSink.collectedTuples.size());
-    int atotal = 0;
-    for (Object o: matchSink.collectedTuples) {
-      atotal += ((HashMap<String,Number>)o).get("a").intValue();
-    }
-    Assert.assertEquals("Value of a was ", 6, atotal);
-    matchSink.clear();
-
-    oper.beginWindow(0);
-    input.clear();
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 5);
-    oper.data.process(input);
-    oper.endWindow();
-    // There should be no emit as all tuples do not match
-    Assert.assertEquals("number emitted tuples", 2, matchSink.collectedTuples.size());
-    atotal = 0;
-    for (Object o: matchSink.collectedTuples) {
-      atotal += ((HashMap<String,Number>)o).get("a").intValue();
-    }
-    Assert.assertEquals("Value of a was ", 7, atotal);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/InsertSortDescTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/InsertSortDescTest.java b/library/src/test/java/com/datatorrent/lib/algo/InsertSortDescTest.java
deleted file mode 100644
index 1dc5a4f..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/InsertSortDescTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.InsertSortDesc}<p>
- */
-public class InsertSortDescTest
-{
-  private static Logger log = LoggerFactory.getLogger(InsertSortDescTest.class);
-
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new InsertSortDesc<Integer>(), "Integer");
-    testNodeProcessingSchema(new InsertSortDesc<Double>(), "Double");
-    testNodeProcessingSchema(new InsertSortDesc<Float>(), "Float");
-    testNodeProcessingSchema(new InsertSortDesc<String>(), "String");
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(InsertSortDesc oper, String debug)
-  {
-    //FirstN<String,Float> aoper = new FirstN<String,Float>();
-    CollectorTestSink sortSink = new CollectorTestSink();
-    CollectorTestSink hashSink = new CollectorTestSink();
-    oper.sort.setSink(sortSink);
-    oper.sorthash.setSink(hashSink);
-
-    ArrayList input = new ArrayList();
-
-    oper.beginWindow(0);
-
-    input.add(2);
-    oper.datalist.process(input);
-    oper.data.process(20);
-
-    input.clear();
-    input.add(1000);
-    input.add(5);
-    input.add(20);
-    input.add(33);
-    input.add(33);
-    input.add(34);
-    oper.datalist.process(input);
-
-    input.clear();
-    input.add(34);
-    input.add(1001);
-    input.add(6);
-    input.add(1);
-    input.add(33);
-    input.add(9);
-    oper.datalist.process(input);
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    Assert.assertEquals("number emitted tuples", 1, hashSink.collectedTuples.size());
-    HashMap map = (HashMap)hashSink.collectedTuples.get(0);
-    input = (ArrayList)sortSink.collectedTuples.get(0);
-    for (Object o : input) {
-      log.debug(String.format("%s : %s", o.toString(), map.get(o).toString()));
-    }
-    log.debug(String.format("Tested %s type with %d tuples and %d uniques\n", debug, input.size(), map.size()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/InvertIndexArrayTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/InvertIndexArrayTest.java b/library/src/test/java/com/datatorrent/lib/algo/InvertIndexArrayTest.java
deleted file mode 100644
index 9a3ce81..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/InvertIndexArrayTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Sink;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.InvertIndex} <p>
- *
- */
-public class InvertIndexArrayTest
-{
-  private static Logger log = LoggerFactory.getLogger(InvertIndexArrayTest.class);
-
-  /**
-   * Test oper logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    InvertIndexArray<String,String> oper = new InvertIndexArray<String,String>();
-    CollectorTestSink indexSink = new CollectorTestSink();
-
-    Sink inSink = oper.data.getSink();
-    oper.index.setSink(indexSink);
-
-    oper.beginWindow(0);
-
-    HashMap<String, ArrayList> input = new HashMap<String, ArrayList>();
-    ArrayList<String> alist = new ArrayList<String>();
-    alist.add("str");
-    alist.add("str1");
-    input.put("a", alist);
-    input.put("b", alist);
-    inSink.put(input);
-
-    alist = new ArrayList<String>();
-    input = new HashMap<String, ArrayList>();
-    alist.add("blah");
-    alist.add("str1");
-    input.put("c", alist);
-    inSink.put(input);
-
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 3, indexSink.collectedTuples.size());
-    for (Object o: indexSink.collectedTuples) {
-      log.debug(o.toString());
-      HashMap<String, ArrayList<String>> output = (HashMap<String, ArrayList<String>>)o;
-      for (Map.Entry<String, ArrayList<String>> e: output.entrySet()) {
-        String key = e.getKey();
-        alist = e.getValue();
-        if (key.equals("str1")) {
-          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a"));
-          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b"));
-          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c"));
-
-        } else if (key.equals("str")) {
-          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a"));
-          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b"));
-          Assert.assertEquals("Index for \"str1\" contains \"c\"", false, alist.contains("c"));
-
-        } else if (key.equals("blah")) {
-          Assert.assertEquals("Index for \"str1\" contains \"a\"", false, alist.contains("a"));
-          Assert.assertEquals("Index for \"str1\" contains \"b\"", false, alist.contains("b"));
-          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c"));
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/InvertIndexTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/InvertIndexTest.java b/library/src/test/java/com/datatorrent/lib/algo/InvertIndexTest.java
deleted file mode 100644
index 7de8e42..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/InvertIndexTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Sink;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.InvertIndex} <p>
- *
- */
-public class InvertIndexTest
-{
-  private static Logger log = LoggerFactory.getLogger(InvertIndexTest.class);
-
-  /**
-   * Test oper logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    InvertIndex<String,String> oper = new InvertIndex<String,String>();
-    CollectorTestSink indexSink = new CollectorTestSink();
-
-    Sink inSink = oper.data.getSink();
-    oper.index.setSink(indexSink);
-
-    oper.beginWindow(0);
-
-    HashMap<String, String> input = new HashMap<String, String>();
-
-    input.put("a", "str");
-    input.put("b", "str");
-    inSink.put(input);
-
-    input.clear();
-    input.put("a", "str1");
-    input.put("b", "str1");
-    inSink.put(input);
-
-    input.clear();
-    input.put("c", "blah");
-    inSink.put(input);
-
-    input.clear();
-    input.put("c", "str1");
-    inSink.put(input);
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 3, indexSink.collectedTuples.size());
-    for (Object o: indexSink.collectedTuples) {
-      log.debug(o.toString());
-      HashMap<String, ArrayList<String>> output = (HashMap<String, ArrayList<String>>)o;
-      for (Map.Entry<String, ArrayList<String>> e: output.entrySet()) {
-        String key = e.getKey();
-        ArrayList<String> alist = e.getValue();
-        if (key.equals("str1")) {
-          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a"));
-          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b"));
-          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c"));
-
-        } else if (key.equals("str")) {
-          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a"));
-          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b"));
-          Assert.assertEquals("Index for \"str1\" contains \"c\"", false, alist.contains("c"));
-
-        } else if (key.equals("blah")) {
-          Assert.assertEquals("Index for \"str1\" contains \"a\"", false, alist.contains("a"));
-          Assert.assertEquals("Index for \"str1\" contains \"b\"", false, alist.contains("b"));
-          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c"));
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/LastMatchMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/LastMatchMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/LastMatchMapTest.java
deleted file mode 100644
index 87ec65a..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/LastMatchMapTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.LastMatchMap}<p>
- *
- */
-public class LastMatchMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new LastMatchMap<String, Integer>());
-    testNodeProcessingSchema(new LastMatchMap<String, Double>());
-    testNodeProcessingSchema(new LastMatchMap<String, Float>());
-    testNodeProcessingSchema(new LastMatchMap<String, Short>());
-    testNodeProcessingSchema(new LastMatchMap<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(LastMatchMap oper)
-  {
-    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
-    oper.last.setSink(matchSink);
-    oper.setKey("a");
-    oper.setValue(3);
-    oper.setTypeEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 4);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.put("a", 3);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 2);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 4);
-    input.put("b", 21);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 3);
-    input.put("b", 52);
-    input.put("c", 5);
-    oper.data.process(input);
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    HashMap<String, Number> tuple = (HashMap<String, Number>)matchSink.tuple;
-    Number aval = tuple.get("a");
-    Number bval = tuple.get("b");
-    Assert.assertEquals("Value of a was ", 3, aval.intValue());
-    Assert.assertEquals("Value of a was ", 52, bval.intValue());
-    matchSink.clear();
-
-    oper.beginWindow(0);
-    input.clear();
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 5);
-    oper.data.process(input);
-    oper.endWindow();
-    // There should be no emit as all tuples do not match
-    Assert.assertEquals("number emitted tuples", 0, matchSink.count);
-    matchSink.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyMapTest.java
deleted file mode 100644
index 8da56ed..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyMapTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.LeastFrequentKeyMap}<p>
- *
- */
-public class LeastFrequentKeyMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    LeastFrequentKeyMap<String, Integer> oper = new LeastFrequentKeyMap<String, Integer>();
-    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
-    CountAndLastTupleTestSink listSink = new CountAndLastTupleTestSink();
-    oper.least.setSink(matchSink);
-    oper.list.setSink(listSink);
-
-    oper.beginWindow(0);
-    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
-    int atot = 5;
-    int btot = 3;
-    int ctot = 6;
-    amap.put("a", null);
-    bmap.put("b", null);
-    cmap.put("c", null);
-    for (int i = 0; i < atot; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot; i++) {
-      oper.data.process(cmap);
-    }
-    oper.endWindow();
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    HashMap<String, Integer> tuple = (HashMap<String, Integer>)matchSink.tuple;
-    Integer val = tuple.get("b");
-    Assert.assertEquals("Count of b was ", btot, val.intValue());
-    Assert.assertEquals("number emitted tuples", 1, listSink.count);
-    ArrayList<HashMap<String, Integer>> list = (ArrayList<HashMap<String, Integer>>)listSink.tuple;
-    val = list.get(0).get("b");
-    Assert.assertEquals("Count of b was ", btot, val.intValue());
-
-    matchSink.clear();
-    listSink.clear();
-    oper.beginWindow(0);
-    atot = 5;
-    btot = 10;
-    ctot = 5;
-    for (int i = 0; i < atot; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot; i++) {
-      oper.data.process(cmap);
-    }
-    oper.endWindow();
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    Assert.assertEquals("number emitted tuples", 1, listSink.count);
-    list = (ArrayList<HashMap<String, Integer>>)listSink.tuple;
-    int acount = 0;
-    int ccount = 0;
-    for (HashMap<String, Integer> h : list) {
-      val = h.get("a");
-      if (val == null) {
-        ccount = h.get("c");
-      } else {
-        acount = val;
-      }
-    }
-    Assert.assertEquals("Count of a was ", atot, acount);
-    Assert.assertEquals("Count of c was ", ctot, ccount);
-    HashMap<String, Integer> mtuple = (HashMap<String, Integer>)matchSink.tuple;
-    val = mtuple.get("a");
-    if (val == null) {
-      val = mtuple.get("c");
-    }
-    Assert.assertEquals("Count of least frequent key was ", ctot, val.intValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMapTest.java
deleted file mode 100644
index 955a901..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/LeastFrequentKeyValueMapTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.LeastFrequentKeyValueMap}<p>
- *
- */
-public class LeastFrequentKeyValueMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    LeastFrequentKeyValueMap<String, Integer> oper = new LeastFrequentKeyValueMap<String, Integer>();
-    CollectorTestSink matchSink = new CollectorTestSink();
-    oper.least.setSink(matchSink);
-
-    oper.beginWindow(0);
-    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
-    int atot1 = 5;
-    int btot1 = 3;
-    int ctot1 = 6;
-    amap.put("a", 1);
-    bmap.put("b", 2);
-    cmap.put("c", 4);
-    for (int i = 0; i < atot1; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot1; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot1; i++) {
-      oper.data.process(cmap);
-    }
-
-    atot1 = 4;
-    btot1 = 3;
-    ctot1 = 10;
-    amap.put("a", 5);
-    bmap.put("b", 4);
-    cmap.put("c", 3);
-    for (int i = 0; i < atot1; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot1; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot1; i++) {
-      oper.data.process(cmap);
-    }
-
-    oper.endWindow();
-    Assert.assertEquals("number emitted tuples", 3, matchSink.collectedTuples.size());
-    int vcount;
-    for (Object o: matchSink.collectedTuples) {
-      HashMap<String, HashMap<Integer, Integer>> omap = (HashMap<String, HashMap<Integer, Integer>>)o;
-      for (Map.Entry<String, HashMap<Integer, Integer>> e: omap.entrySet()) {
-        String key = e.getKey();
-        if (key.equals("a")) {
-          vcount = e.getValue().get(5);
-          Assert.assertEquals("Key \"a\" has value ", 4, vcount);
-
-        } else if (key.equals("b")) {
-          vcount = e.getValue().get(2);
-          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
-          vcount = e.getValue().get(4);
-          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
-
-        } else if (key.equals("c")) {
-          vcount = e.getValue().get(4);
-          Assert.assertEquals("Key \"a\" has value ", 6, vcount);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/MatchAllMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/MatchAllMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/MatchAllMapTest.java
index 15e7b3a..047acb6 100644
--- a/library/src/test/java/com/datatorrent/lib/algo/MatchAllMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/algo/MatchAllMapTest.java
@@ -26,10 +26,11 @@ import org.junit.Test;
 import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
 
 /**
- *
+ * @deprecated
  * Functional tests for {@link com.datatorrent.lib.algo.MatchAllMap}<p>
- *
+ * (Deprecating inclass) Comment: MatchAllMap is deprecated.
  */
+@Deprecated
 public class MatchAllMapTest
 {
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/MatchAnyMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/MatchAnyMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/MatchAnyMapTest.java
index 5dfd9ea..12e3c76 100644
--- a/library/src/test/java/com/datatorrent/lib/algo/MatchAnyMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/algo/MatchAnyMapTest.java
@@ -21,16 +21,16 @@ package com.datatorrent.lib.algo;
 import java.util.HashMap;
 
 import org.junit.Assert;
-
 import org.junit.Test;
 
 import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
 
 /**
- *
+ * @deprecated
  * Functional tests for {@link com.datatorrent.lib.algo.MatchAnyMap}<p>
- *
+ * (Deprecating inclass) Comment: MatchAnyMap is deprecated.
  */
+@Deprecated
 public class MatchAnyMapTest
 {
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/MatchMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/MatchMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/MatchMapTest.java
deleted file mode 100644
index ecd5e9b..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/MatchMapTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.MatchMap}<p>
- *
- */
-public class MatchMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new MatchMap<String, Integer>());
-    testNodeProcessingSchema(new MatchMap<String, Double>());
-    testNodeProcessingSchema(new MatchMap<String, Float>());
-    testNodeProcessingSchema(new MatchMap<String, Short>());
-    testNodeProcessingSchema(new MatchMap<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(MatchMap oper)
-  {
-    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
-    oper.match.setSink(matchSink);
-    oper.setKey("a");
-    oper.setValue(3.0);
-    oper.setTypeNEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 3);
-    oper.data.process(input);
-    oper.endWindow();
-
-    // One for each key
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    for (Map.Entry<String, Number> e : ((HashMap<String, Number>)matchSink.tuple).entrySet()) {
-      if (e.getKey().equals("a")) {
-        Assert.assertEquals("emitted value for 'a' was ", new Double(2), new Double(e.getValue().doubleValue()));
-      } else if (e.getKey().equals("b")) {
-        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), new Double(e.getValue().doubleValue()));
-      } else if (e.getKey().equals("c")) {
-        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), new Double(e.getValue().doubleValue()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/MergeSortNumberTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/MergeSortNumberTest.java b/library/src/test/java/com/datatorrent/lib/algo/MergeSortNumberTest.java
index 4701957..b9991bd 100644
--- a/library/src/test/java/com/datatorrent/lib/algo/MergeSortNumberTest.java
+++ b/library/src/test/java/com/datatorrent/lib/algo/MergeSortNumberTest.java
@@ -29,9 +29,11 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
 import static org.junit.Assert.assertTrue;
 
 /**
- *
+ * @deprecated
  * Functional tests for {@link com.datatorrent.lib.algo.MergeSort}<p>
+ *   (Deprecating inclass) Comment: MergeSortNumber is deprecated.
  */
+@Deprecated
 public class MergeSortNumberTest
 {
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyMapTest.java
deleted file mode 100644
index 6851e4a..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyMapTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.MostFrequentKeyMap}<p>
- *
- */
-public class MostFrequentKeyMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    MostFrequentKeyMap<String, Integer> oper = new MostFrequentKeyMap<String, Integer>();
-    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
-    CountAndLastTupleTestSink listSink = new CountAndLastTupleTestSink();
-    oper.most.setSink(matchSink);
-    oper.list.setSink(listSink);
-
-    oper.beginWindow(0);
-    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
-    int atot = 5;
-    int btot = 7;
-    int ctot = 6;
-    amap.put("a", null);
-    bmap.put("b", null);
-    cmap.put("c", null);
-    for (int i = 0; i < atot; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot; i++) {
-      oper.data.process(cmap);
-    }
-    oper.endWindow();
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    HashMap<String, Integer> tuple = (HashMap<String, Integer>)matchSink.tuple;
-    Integer val = tuple.get("b");
-    Assert.assertEquals("Count of b was ", btot, val.intValue());
-    Assert.assertEquals("number emitted tuples", 1, listSink.count);
-    ArrayList<HashMap<String, Integer>> list = (ArrayList<HashMap<String, Integer>>)listSink.tuple;
-    val = list.get(0).get("b");
-    Assert.assertEquals("Count of b was ", btot, val.intValue());
-
-    matchSink.clear();
-    listSink.clear();
-    oper.beginWindow(0);
-    atot = 5;
-    btot = 4;
-    ctot = 5;
-    for (int i = 0; i < atot; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot; i++) {
-      oper.data.process(cmap);
-    }
-    oper.endWindow();
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    Assert.assertEquals("number emitted tuples", 1, listSink.count);
-    list = (ArrayList<HashMap<String, Integer>>)listSink.tuple;
-    int acount = 0;
-    int ccount = 0;
-    for (HashMap<String, Integer> h : list) {
-      val = h.get("a");
-      if (val == null) {
-        ccount = h.get("c");
-      } else {
-        acount = val;
-      }
-    }
-    Assert.assertEquals("Count of a was ", atot, acount);
-    Assert.assertEquals("Count of c was ", ctot, ccount);
-    HashMap<String, Integer> mtuple = (HashMap<String, Integer>)matchSink.tuple;
-    val = mtuple.get("a");
-    if (val == null) {
-      val = mtuple.get("c");
-    }
-    Assert.assertEquals("Count of least frequent key was ", ctot, val.intValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyValueMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyValueMapTest.java b/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyValueMapTest.java
deleted file mode 100644
index 5a72a5c..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/MostFrequentKeyValueMapTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.MostFrequentKeyValueMap}<p>
- *
- */
-public class MostFrequentKeyValueMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    MostFrequentKeyValueMap<String, Integer> oper = new MostFrequentKeyValueMap<String, Integer>();
-    CollectorTestSink matchSink = new CollectorTestSink();
-    oper.most.setSink(matchSink);
-
-    oper.beginWindow(0);
-    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
-    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
-    int atot1 = 5;
-    int btot1 = 3;
-    int ctot1 = 6;
-    amap.put("a", 1);
-    bmap.put("b", 2);
-    cmap.put("c", 4);
-    for (int i = 0; i < atot1; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot1; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot1; i++) {
-      oper.data.process(cmap);
-    }
-
-    atot1 = 4;
-    btot1 = 3;
-    ctot1 = 10;
-    amap.put("a", 5);
-    bmap.put("b", 4);
-    cmap.put("c", 3);
-    for (int i = 0; i < atot1; i++) {
-      oper.data.process(amap);
-    }
-    for (int i = 0; i < btot1; i++) {
-      oper.data.process(bmap);
-    }
-    for (int i = 0; i < ctot1; i++) {
-      oper.data.process(cmap);
-    }
-
-    oper.endWindow();
-    Assert.assertEquals("number emitted tuples", 3, matchSink.collectedTuples.size());
-    int vcount;
-    for (Object o: matchSink.collectedTuples) {
-      HashMap<String, HashMap<Integer, Integer>> omap = (HashMap<String, HashMap<Integer, Integer>>)o;
-      for (Map.Entry<String, HashMap<Integer, Integer>> e: omap.entrySet()) {
-        String key = e.getKey();
-        if (key.equals("a")) {
-          vcount = e.getValue().get(1);
-          Assert.assertEquals("Key \"a\" has value ", 5, vcount);
-
-        } else if (key.equals("b")) {
-          vcount = e.getValue().get(2);
-          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
-          vcount = e.getValue().get(4);
-          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
-
-        } else if (key.equals("c")) {
-          vcount = e.getValue().get(3);
-          Assert.assertEquals("Key \"a\" has value ", 10, vcount);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java b/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java
deleted file mode 100644
index 10a7f3d..0000000
--- a/library/src/test/java/com/datatorrent/lib/algo/SamplerTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.algo.Sampler}<p>
- *
- */
-public class SamplerTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    Sampler<String> oper = new Sampler<String>();
-    CountTestSink sink = new CountTestSink<String>();
-    oper.sample.setSink(sink);
-    oper.setSamplingPercentage(.1);
-
-    String tuple = "a";
-
-
-    int numTuples = 10000;
-    oper.beginWindow(0);
-    for (int i = 0; i < numTuples; i++) {
-      oper.data.process(tuple);
-    }
-
-    oper.endWindow();
-    int lowerlimit = 5;
-    int upperlimit = 15;
-    int actual = (100 * sink.count) / numTuples;
-
-    Assert.assertEquals("number emitted tuples", true, lowerlimit < actual);
-    Assert.assertEquals("number emitted tuples", true, upperlimit > actual);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/algo/TopNUniqueTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/algo/TopNUniqueTest.java b/library/src/test/java/com/datatorrent/lib/algo/TopNUniqueTest.java
index 601cb3b..8181b2d 100644
--- a/library/src/test/java/com/datatorrent/lib/algo/TopNUniqueTest.java
+++ b/library/src/test/java/com/datatorrent/lib/algo/TopNUniqueTest.java
@@ -30,10 +30,11 @@ import org.slf4j.LoggerFactory;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
 /**
- *
+ * @deprecated
  * Functional tests for {@link com.datatorrent.lib.algo.TopNUnique}<p>
- *
+ * (Deprecating inclass) Comment: TopNUnique is deprecated.
  */
+@Deprecated
 public class TopNUniqueTest
 {
   private static Logger log = LoggerFactory.getLogger(TopNUniqueTest.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
index 4241a80..e71074e 100644
--- a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
@@ -19,6 +19,7 @@
 package com.datatorrent.lib.join;
 
 import java.util.HashMap;
+
 import org.junit.Assert;
 import org.junit.Test;