You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:41:53 UTC

[03/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java b/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
index 59b2c4d..10e0f5e 100644
--- a/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
+++ b/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
@@ -23,14 +23,16 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.lang.ObjectUtils.Null;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.lib.testbench.CollectorTestSink;
+import org.apache.commons.lang.ObjectUtils.Null;
+
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
 /**
  * A unit test to test SortedMovingWindow operator can either:
  * 1. sort simple comparable tuples
@@ -45,7 +47,8 @@ public class SortedMovingWindowTest
    * Test sorting simple comparable tuples within the sliding window
    */
   @Test
-  public void testSortingSimpleNumberTuple(){
+  public void testSortingSimpleNumberTuple()
+  {
     SortedMovingWindow<Integer, Null> smw = new SortedMovingWindow<Integer, Null>();
     CollectorTestSink<Object> testSink = new CollectorTestSink<Object>();
     smw.outputPort.setSink(testSink);
@@ -73,9 +76,10 @@ public class SortedMovingWindowTest
 
     SortedMovingWindow<Map<String, Integer>, Null> smw = new SortedMovingWindow<Map<String, Integer>, Null>();
 
-    final String[] keys = { "number" };
+    final String[] keys = {"number"};
 
-    smw.setComparator(new Comparator<Map<String, Integer>>() {
+    smw.setComparator(new Comparator<Map<String, Integer>>()
+    {
       @Override
       public int compare(Map<String, Integer> o1, Map<String, Integer> o2)
       {
@@ -89,15 +93,17 @@ public class SortedMovingWindowTest
     smw.setWindowSize(2);
 
     // The incoming 6 simple map tuples are disordered among 4 windows 
-    emitObjects(smw, new Map[][] { createHashMapTuples(keys, new Integer[][] { { 1 }, { 3 } }), createHashMapTuples(keys, new Integer[][] { { 2 }, { 5 } }), 
-        createHashMapTuples(keys, new Integer[][] { { 4 } }), createHashMapTuples(keys, new Integer[][] { { 6 } }) });
+    emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Integer[][]{{1}, {3}}),
+        createHashMapTuples(keys, new Integer[][]{{2}, {5}}),
+        createHashMapTuples(keys, new Integer[][]{{4}}), createHashMapTuples(keys, new Integer[][]{{6}})});
     smw.beginWindow(4);
     smw.endWindow();
     smw.beginWindow(5);
     smw.endWindow();
 
     // The outcome is ordered by the value of the key "number"
-    Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][] { { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 6 } })), testSink.collectedTuples);
+    Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][]{{1}, {2}, {3}, {4}, {5}, {6}})),
+        testSink.collectedTuples);
   }
   
   
@@ -110,9 +116,10 @@ public class SortedMovingWindowTest
 
     SortedMovingWindow<Map<String, Object>, String> smw = new SortedMovingWindow<Map<String, Object>, String>();
 
-    final String[] keys = { "name", "number" };
+    final String[] keys = {"name", "number"};
 
-    smw.setComparator(new Comparator<Map<String, Object>>() {
+    smw.setComparator(new Comparator<Map<String, Object>>()
+    {
       @Override
       public int compare(Map<String, Object> o1, Map<String, Object> o2)
       {
@@ -121,12 +128,13 @@ public class SortedMovingWindowTest
       }
     });
     
-    smw.setFunction(new Function<Map<String,Object>, String>() {
+    smw.setFunction(new Function<Map<String,Object>, String>()
+    {
       @Override
       public String apply(Map<String, Object> input)
       {
         // order tuple with same key "name"
-        return (String) input.get(keys[0]);
+        return (String)input.get(keys[0]);
       }
     });
     CollectorTestSink<Object> testSink = new CollectorTestSink<Object>();
@@ -135,23 +143,23 @@ public class SortedMovingWindowTest
     smw.setWindowSize(2);
 
     // The incoming 9 complex map tuples are disordered with same name among 4 windows 
-    emitObjects(smw, new Map[][] { createHashMapTuples(keys, new Object[][] { {"bob", 1 }, {"jim", 1 } }), createHashMapTuples(keys, new Object[][] { {"jim", 2 }, { "bob", 3 } }), 
-        createHashMapTuples(keys, new Object[][] { { "bob", 2 }, { "jim", 4} }), createHashMapTuples(keys, new Object[][] { {"bob", 5}, {"jim", 3 }, {"bob", 4} }) });
+    emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Object[][]{{"bob", 1}, {"jim", 1}}),
+        createHashMapTuples(keys, new Object[][]{{"jim", 2}, {"bob", 3}}),
+        createHashMapTuples(keys, new Object[][]{{"bob", 2}, {"jim", 4}}),
+        createHashMapTuples(keys, new Object[][]{{"bob", 5}, {"jim", 3}, {"bob", 4}})});
     smw.beginWindow(4);
     smw.endWindow();
     smw.beginWindow(5);
     smw.endWindow();
 
     // All tuples with same "name" are sorted by key "number"
-    Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Object[][] { { "bob", 1 }, { "jim", 1 }, { "jim", 2 }, { "bob", 2 },
-        { "bob", 3 }, { "jim", 3 }, { "jim", 4 }, { "bob", 4 }, { "bob", 5 } })), testSink.collectedTuples);
+    Assert.assertEquals(Arrays.asList(createHashMapTuples(keys,
+        new Object[][]{{"bob", 1}, {"jim", 1}, {"jim", 2}, {"bob", 2}, {"bob", 3}, {"jim", 3}, {"jim", 4}, {"bob", 4}, {"bob", 5}})), testSink.collectedTuples);
   }
   
-  
-  
-  
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private void emitObjects(SortedMovingWindow win, Object[][] obj){
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void emitObjects(SortedMovingWindow win, Object[][] obj)
+  {
     for (int i = 0; i < obj.length; i++) {
       win.beginWindow(i);
       for (int j = 0; j < obj[i].length; j++) {
@@ -161,8 +169,9 @@ public class SortedMovingWindowTest
     }
   }
   
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private Map[] createHashMapTuples(String[] cols, Object[][] values){
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private Map[] createHashMapTuples(String[] cols, Object[][] values)
+  {
     
     HashMap[] maps = new HashMap[values.length];
     int index = -1;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
index 3a6f7fe..8d52f7e 100644
--- a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
@@ -21,20 +21,21 @@ package com.datatorrent.lib.partitioner;
 import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.api.*;
+import com.google.common.collect.Lists;
+
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Partitioner.Partition;
 import com.datatorrent.api.StringCodec.Object2String;
-
-import com.datatorrent.lib.util.TestUtils;
-
 import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.util.TestUtils;
 
 public class StatelessPartitionerTest
 {
@@ -101,7 +102,7 @@ public class StatelessPartitionerTest
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
     Assert.assertEquals("Incorrect number of partitions", 1, newPartitions.size());
 
-    for(Partition<DummyOperator> partition: newPartitions) {
+    for (Partition<DummyOperator> partition : newPartitions) {
       Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
     }
   }
@@ -119,7 +120,7 @@ public class StatelessPartitionerTest
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
     Assert.assertEquals("Incorrect number of partitions", 5, newPartitions.size());
 
-    for(Partition<DummyOperator> partition: newPartitions) {
+    for (Partition<DummyOperator> partition : newPartitions) {
       Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
     }
   }
@@ -149,7 +150,7 @@ public class StatelessPartitionerTest
     partitions.add(mockPartition);
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
-      new PartitioningContextImpl(null, 5));
+        new PartitioningContextImpl(null, 5));
     Assert.assertEquals("after partition", 5, newPartitions.size());
   }
 
@@ -172,7 +173,7 @@ public class StatelessPartitionerTest
     }
 
     Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
-      new PartitioningContextImpl(null, 1));
+        new PartitioningContextImpl(null, 1));
     Assert.assertEquals("after partition", 1, newPartitions.size());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
index 16647ab..6eaebc3 100644
--- a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.lib.partitioner;
 
-import com.datatorrent.api.*;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +28,13 @@ import org.junit.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
 import com.datatorrent.lib.util.TestUtils;
 
 /**
@@ -150,8 +156,8 @@ public class StatelessThroughputBasedPartitionerTest
     partitions.clear();
     partitions.add(mockPartition);
 
-    Collection<Partitioner.Partition<DummyOperator>> newPartitions = statelessLatencyBasedPartitioner.definePartitions(partitions,
-                                                                                                                                                            new StatelessPartitionerTest.PartitioningContextImpl(ports, 5));
+    Collection<Partitioner.Partition<DummyOperator>> newPartitions = statelessLatencyBasedPartitioner.definePartitions(
+        partitions, new StatelessPartitionerTest.PartitioningContextImpl(ports, 5));
     Assert.assertEquals("after partition", 2, newPartitions.size());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java b/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
index f3f69fb..3ad30df 100644
--- a/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
@@ -56,7 +56,7 @@ public class JavaScriptOperatorTest
     // Validate value.
     Assert.assertEquals("number emitted tuples", 1, sink.collectedTuples.size());
     for (Object o : sink.collectedTuples) { // count is 12
-      Assert.assertEquals("4.0 is expected", (Double) o, 4.0, 0);
+      Assert.assertEquals("4.0 is expected", (Double)o, 4.0, 0);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
index 1cf89ff..47fa2c2 100644
--- a/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
@@ -18,12 +18,12 @@
  */
 package com.datatorrent.lib.statistics;
 
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
 /**
  * Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br>
  */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
index 158fed5..26e94c6 100644
--- a/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
@@ -18,12 +18,12 @@
  */
 package com.datatorrent.lib.statistics;
 
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
 /**
  * Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br>
  */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
index e3788c2..f9589db 100644
--- a/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
@@ -18,12 +18,12 @@
  */
 package com.datatorrent.lib.statistics;
 
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
 /**
  * Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br>
  */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java b/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
index 76983f5..13d6097 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
@@ -30,36 +30,35 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class ArrayListAggregatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
-	public void testNodeProcessing() throws Exception
-	{
-		ArrayListAggregator<Integer> oper = new ArrayListAggregator<Integer>();
-		CollectorTestSink cSink = new CollectorTestSink();
+  public void testNodeProcessing() throws Exception
+  {
+    ArrayListAggregator<Integer> oper = new ArrayListAggregator<Integer>();
+    CollectorTestSink cSink = new CollectorTestSink();
 
-		oper.output.setSink(cSink);
-		oper.setSize(10);
-		int numtuples = 100;
+    oper.output.setSink(cSink);
+    oper.setSize(10);
+    int numtuples = 100;
 
-		oper.beginWindow(0);
-		for (int i = 0; i < numtuples; i++) {
-			oper.input.process(i);
-		}
-		oper.endWindow();
-		Assert.assertEquals("number emitted tuples", 10,
-				cSink.collectedTuples.size());
+    oper.beginWindow(0);
+    for (int i = 0; i < numtuples; i++) {
+      oper.input.process(i);
+    }
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 10,
+        cSink.collectedTuples.size());
 
-		cSink.clear();
-		oper.setSize(0);
+    cSink.clear();
+    oper.setSize(0);
 
-		oper.beginWindow(1);
-		for (int i = 0; i < numtuples; i++) {
-			oper.input.process(i);
-		}
-		oper.endWindow();
-		Assert.assertEquals("number emitted tuples", 1,
-				cSink.collectedTuples.size());
-		ArrayList<?> list = (ArrayList<?>) cSink.collectedTuples.get(0);
-		Assert.assertEquals("number emitted tuples", numtuples, list.size());
-	}
+    oper.beginWindow(1);
+    for (int i = 0; i < numtuples; i++) {
+      oper.input.process(i);
+    }
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size());
+    ArrayList<?> list = (ArrayList<?>)cSink.collectedTuples.get(0);
+    Assert.assertEquals("number emitted tuples", numtuples, list.size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java b/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
index fe0edc8..6f1504d 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
@@ -30,29 +30,29 @@ import com.datatorrent.lib.testbench.CountTestSink;
  * Benchmarks: Currently does about ?? Million tuples/sec in debugging environment. Need to test on larger nodes<br>
  * <br>
  */
-public class ArrayListToItemTest {
-	
-    /**
-     * Test operator pass through. The Object passed is not relevant
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-		@Test
-    public void testNodeProcessing() throws Exception
-    {
-      ArrayListToItem oper = new ArrayListToItem();
-      CountTestSink itemSink = new CountTestSink();
-      oper.item.setSink(itemSink);
+public class ArrayListToItemTest
+{
+  /**
+   * Test operator pass through. The Object passed is not relevant
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    ArrayListToItem oper = new ArrayListToItem();
+    CountTestSink itemSink = new CountTestSink();
+    oper.item.setSink(itemSink);
 
-      oper.beginWindow(0);
-      ArrayList<String> input = new ArrayList<String>();
-      input.add("a");
-      // Same input object can be used as the oper is just pass through
-      int numtuples = 1000;
-      for (int i = 0; i < numtuples; i++) {
-        oper.data.process(input);
-      }
-
-      oper.endWindow();
-      Assert.assertEquals("number emitted tuples", numtuples, itemSink.count);
+    oper.beginWindow(0);
+    ArrayList<String> input = new ArrayList<String>();
+    input.add("a");
+    // Same input object can be used as the oper is just pass through
+    int numtuples = 1000;
+    for (int i = 0; i < numtuples; i++) {
+      oper.data.process(input);
     }
+
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", numtuples, itemSink.count);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java b/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
index 697ae34..de419fd 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
@@ -32,25 +32,25 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 public class ConsolidatorKeyValTest
 {
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Test
-	public void testNodeProcessing() throws Exception
-	{
-		ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer> oper =
-				new ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer>();
-		CollectorTestSink cSink = new CollectorTestSink();
-		oper.out.setSink(cSink);
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer> oper =
+        new ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer>();
+    CollectorTestSink cSink = new CollectorTestSink();
+    oper.out.setSink(cSink);
 
-		oper.beginWindow(0);
-		KeyValPair<String, Integer> m1 = new KeyValPair<String, Integer>("a",1);
-		oper.in1.process(m1);
-		KeyValPair<String, Double> m2 = new KeyValPair<String, Double>("a",1.0);
-		oper.in2.process(m2);
-		oper.endWindow();
-		Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size());
+    oper.beginWindow(0);
+    KeyValPair<String, Integer> m1 = new KeyValPair<String, Integer>("a",1);
+    oper.in1.process(m1);
+    KeyValPair<String, Double> m2 = new KeyValPair<String, Double>("a",1.0);
+    oper.in2.process(m2);
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size());
 
 
-		HashMap<String, ArrayList<Object>> map = (HashMap<String, ArrayList<Object>>) cSink.collectedTuples.get(0);
-		Assert.assertEquals("size of sink map", 1, map.size());
-	}
+    HashMap<String, ArrayList<Object>> map = (HashMap<String, ArrayList<Object>>)cSink.collectedTuples.get(0);
+    Assert.assertEquals("size of sink map", 1, map.size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java b/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
index c9837cf..dc73a75 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
@@ -27,33 +27,34 @@ import com.datatorrent.lib.testbench.CountTestSink;
  * Functional test for {@link com.datatorrent.lib.stream.Counter}<p>
  * <br>
  */
-public class CounterTest {
-
-    /**
-     * Test oper pass through. The Object passed is not relevant
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Test
-    public void testNodeProcessing() throws Exception
-    {
-      Counter oper = new Counter();
-      CountTestSink cSink = new CountTestSink();
-
-      oper.output.setSink(cSink);
-      int numtuples = 100;
-
-      oper.beginWindow(0);
-      for (int i = 0; i < numtuples; i++) {
-        oper.input.process(i);
-      }
-      oper.endWindow();
-
-      oper.beginWindow(1);
-      for (int i = 0; i < numtuples; i++) {
-        oper.input.process(i);
-      }
-      oper.endWindow();
-
-      Assert.assertEquals("number emitted tuples", 2, cSink.getCount());
+public class CounterTest
+{
+
+  /**
+   * Test oper pass through. The Object passed is not relevant
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    Counter oper = new Counter();
+    CountTestSink cSink = new CountTestSink();
+
+    oper.output.setSink(cSink);
+    int numtuples = 100;
+
+    oper.beginWindow(0);
+    for (int i = 0; i < numtuples; i++) {
+      oper.input.process(i);
     }
+    oper.endWindow();
+
+    oper.beginWindow(1);
+    for (int i = 0; i < numtuples; i++) {
+      oper.input.process(i);
+    }
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 2, cSink.getCount());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java b/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
index 0cb5f42..6266787 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
@@ -18,13 +18,12 @@
  */
 package com.datatorrent.lib.stream;
 
-import com.datatorrent.lib.stream.DevNullCounter;
-import com.datatorrent.lib.testbench.EventGenerator;
-
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.testbench.EventGenerator;
+
 /**
  * 
  * Functional tests for {@link com.datatorrent.lib.testbench.DevNullCounter}.
@@ -41,29 +40,26 @@ import org.slf4j.LoggerFactory;
 public class DevNullCounterTest
 {
 
-	private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
+  private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
 
-	/**
-	 * Tests both string and non string schema
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testSingleSchemaNodeProcessing() throws Exception
-	{
-		DevNullCounter oper = new DevNullCounter();
-		oper.setRollingwindowcount(5);
-		oper.setup(null);
+  /**
+   * Tests both string and non string schema
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSingleSchemaNodeProcessing() throws Exception
+  {
+    DevNullCounter oper = new DevNullCounter();
+    oper.setRollingwindowcount(5);
+    oper.setup(null);
 
-		oper.beginWindow(0);
-		long numtuples = 1000000;
-		Object o = new Object();
-		for (long i = 0; i < numtuples; i++) {
-			oper.data.process(o);
-		}
-		oper.endWindow();
-		LOG.info(String
-				.format(
-						"\n*******************************************************\nnumtuples(%d)",
-						numtuples));
-	}
+    oper.beginWindow(0);
+    long numtuples = 1000000;
+    Object o = new Object();
+    for (long i = 0; i < numtuples; i++) {
+      oper.data.process(o);
+    }
+    oper.endWindow();
+    LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java b/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
index 4695d69..3bd9c11 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
@@ -18,26 +18,25 @@
  */
 package com.datatorrent.lib.stream;
 
-import com.datatorrent.lib.stream.DevNull;
-import com.datatorrent.lib.testbench.EventGenerator;
-
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.testbench.EventGenerator;
+
 /**
  * Functional tests for {@link com.datatorrent.lib.testbench.DevNull}. 
  */
-public class DevNullTest {
-
-    private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
+public class DevNullTest
+{
 
+  private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
 
   /**
    * Tests both string and non string schema
    */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test
   public void testSingleSchemaNodeProcessing() throws Exception
   {
     DevNull oper = new DevNull();
@@ -49,6 +48,6 @@ public class DevNullTest {
       oper.data.process(o);
     }
     oper.endWindow();
-    LOG.info(String.format("\n*******************************************************\nnumtuples(%d)",  numtuples));
+    LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java b/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
index 99305e3..314eb01 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
@@ -31,35 +31,35 @@ import com.datatorrent.lib.testbench.CountTestSink;
 public class HashMapToKeyValPairTest
 {
 
-	/**
-	 * Test oper pass through. The Object passed is not relevant
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testNodeProcessing() throws Exception
-	{
-		HashMapToKeyValPair oper = new HashMapToKeyValPair();
-		CountTestSink keySink = new CountTestSink();
-		CountTestSink valSink = new CountTestSink();
-		CountTestSink keyvalSink = new CountTestSink();
+  /**
+   * Test oper pass through. The Object passed is not relevant
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    HashMapToKeyValPair oper = new HashMapToKeyValPair();
+    CountTestSink keySink = new CountTestSink();
+    CountTestSink valSink = new CountTestSink();
+    CountTestSink keyvalSink = new CountTestSink();
 
-		oper.key.setSink(keySink);
-		oper.val.setSink(valSink);
-		oper.keyval.setSink(keyvalSink);
+    oper.key.setSink(keySink);
+    oper.val.setSink(valSink);
+    oper.keyval.setSink(keyvalSink);
 
-		oper.beginWindow(0);
-		HashMap<String, String> input = new HashMap<String, String>();
-		input.put("a", "1");
-		// Same input object can be used as the oper is just pass through
-		int numtuples = 1000;
-		for (int i = 0; i < numtuples; i++) {
-			oper.data.process(input);
-		}
+    oper.beginWindow(0);
+    HashMap<String, String> input = new HashMap<String, String>();
+    input.put("a", "1");
+    // Same input object can be used as the oper is just pass through
+    int numtuples = 1000;
+    for (int i = 0; i < numtuples; i++) {
+      oper.data.process(input);
+    }
 
-		oper.endWindow();
+    oper.endWindow();
 
-		Assert.assertEquals("number emitted tuples", numtuples, keySink.count);
-		Assert.assertEquals("number emitted tuples", numtuples, valSink.count);
-		Assert.assertEquals("number emitted tuples", numtuples, keyvalSink.count);
-	}
+    Assert.assertEquals("number emitted tuples", numtuples, keySink.count);
+    Assert.assertEquals("number emitted tuples", numtuples, valSink.count);
+    Assert.assertEquals("number emitted tuples", numtuples, keyvalSink.count);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java b/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
index a46eee7..e376c95 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
@@ -32,76 +32,75 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class JsonByteArrayOperatorTest
 {
-   /**
-     * Test json byte array to HashMap operator pass through. The Object passed is not relevant
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-		@Test
-    public void testOperator() throws Exception
-    {
-      JsonByteArrayOperator oper = new JsonByteArrayOperator();
-      oper.setConcatenationCharacter('.');
-
-      CollectorTestSink mapSink = new CollectorTestSink();
-      CollectorTestSink jsonObjectSink = new CollectorTestSink();
-      CollectorTestSink flatMapSink = new CollectorTestSink();
-
-      oper.outputMap.setSink(mapSink);
-      oper.outputJsonObject.setSink(jsonObjectSink);
-      oper.outputFlatMap.setSink(flatMapSink);
-
-      oper.beginWindow(0);
-
-      // input test json string
-      String inputJson  = " {   \"@timestamp\":\"2013-09-25T19:37:23.569Z\""
-                        + "      ,\"@version\":\"1\""
-                        + "          ,\"type\":\"apache-logs\""
-                        + "          ,\"host\":\"node1001\""
-                        + "      ,\"clientip\":192.168.150.120"
-                        + "          ,\"verb\":\"GET\""
-                        + "       ,\"request\":\"/reset.css\""
-                        + "   ,\"httpversion\":\"1.1\""
-                        + "      ,\"response\":200"
-                        + "     ,\"agentinfo\": {\"browser\":Firefox"
-                        + "                          ,\"os\": {    \"name\":\"Ubuntu\""
-                        + "                                    ,\"version\":\"10.04\""
-                        + "                                   }"
-                        + "                     }"
-                        + "         ,\"bytes\":909.1"
-                        + " }";
-
-      byte[] inputByteArray = inputJson.getBytes();
-
-      // run the operator for the same string 1000 times
-      int numtuples = 1000;
-      for (int i = 0; i < numtuples; i++) {
-        oper.input.process(inputByteArray);
-      }
-
-      oper.endWindow();
-
-      // assert that the number of the operator generates is 1000
-      Assert.assertEquals("number emitted tuples", numtuples, mapSink.collectedTuples.size());
-      Assert.assertEquals("number emitted tuples", numtuples, jsonObjectSink.collectedTuples.size());
-      Assert.assertEquals("number emitted tuples", numtuples, flatMapSink.collectedTuples.size());
-
-      // assert that value for one of the keys in any one of the objects from mapSink is as expected
-      Object map = mapSink.collectedTuples.get(510);
-      String expectedClientip = "192.168.150.120";
-      Assert.assertEquals("emitted tuple", expectedClientip, ((Map)map).get("clientip"));
-
-
-      // assert that value for one of the keys in any one of the objects from jsonObjectSink is as expected
-      Object jsonObject = jsonObjectSink.collectedTuples.get(433);
-      Number expectedResponse = 200;
-      Assert.assertEquals("emitted tuple", expectedResponse, ((JSONObject)jsonObject).get("response"));
-
-      // assert that value for one of the keys in any one of the objects from flatMapSink is as expected
-      Map flatMap = (Map)flatMapSink.collectedTuples.get(511);
-      String expectedBrowser = "Firefox";
-      String expectedOsName = "Ubuntu";
-      Assert.assertEquals("emitted tuple", expectedBrowser, flatMap.get("agentinfo.browser"));
-      Assert.assertEquals("emitted tuple", expectedOsName, flatMap.get("agentinfo.os.name"));
+  /**
+   * Test json byte array to HashMap operator pass through. The Object passed is not relevant
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test
+  public void testOperator() throws Exception
+  {
+    JsonByteArrayOperator oper = new JsonByteArrayOperator();
+    oper.setConcatenationCharacter('.');
+
+    CollectorTestSink mapSink = new CollectorTestSink();
+    CollectorTestSink jsonObjectSink = new CollectorTestSink();
+    CollectorTestSink flatMapSink = new CollectorTestSink();
+
+    oper.outputMap.setSink(mapSink);
+    oper.outputJsonObject.setSink(jsonObjectSink);
+    oper.outputFlatMap.setSink(flatMapSink);
+
+    oper.beginWindow(0);
+
+    // input test json string
+    String inputJson = " {   \"@timestamp\":\"2013-09-25T19:37:23.569Z\""
+        + "      ,\"@version\":\"1\""
+        + "          ,\"type\":\"apache-logs\""
+        + "          ,\"host\":\"node1001\""
+        + "      ,\"clientip\":192.168.150.120"
+        + "          ,\"verb\":\"GET\""
+        + "       ,\"request\":\"/reset.css\""
+        + "   ,\"httpversion\":\"1.1\""
+        + "      ,\"response\":200"
+        + "     ,\"agentinfo\": {\"browser\":Firefox"
+        + "                          ,\"os\": {    \"name\":\"Ubuntu\""
+        + "                                    ,\"version\":\"10.04\""
+        + "                                   }"
+        + "                     }"
+        + "         ,\"bytes\":909.1"
+        + " }";
+
+    byte[] inputByteArray = inputJson.getBytes();
+
+    // run the operator for the same string 1000 times
+    int numtuples = 1000;
+    for (int i = 0; i < numtuples; i++) {
+      oper.input.process(inputByteArray);
     }
 
+    oper.endWindow();
+
+    // assert that the number of the operator generates is 1000
+    Assert.assertEquals("number emitted tuples", numtuples, mapSink.collectedTuples.size());
+    Assert.assertEquals("number emitted tuples", numtuples, jsonObjectSink.collectedTuples.size());
+    Assert.assertEquals("number emitted tuples", numtuples, flatMapSink.collectedTuples.size());
+
+    // assert that value for one of the keys in any one of the objects from mapSink is as expected
+    Object map = mapSink.collectedTuples.get(510);
+    String expectedClientip = "192.168.150.120";
+    Assert.assertEquals("emitted tuple", expectedClientip, ((Map)map).get("clientip"));
+
+    // assert that value for one of the keys in any one of the objects from jsonObjectSink is as expected
+    Object jsonObject = jsonObjectSink.collectedTuples.get(433);
+    Number expectedResponse = 200;
+    Assert.assertEquals("emitted tuple", expectedResponse, ((JSONObject)jsonObject).get("response"));
+
+    // assert that value for one of the keys in any one of the objects from flatMapSink is as expected
+    Map flatMap = (Map)flatMapSink.collectedTuples.get(511);
+    String expectedBrowser = "Firefox";
+    String expectedOsName = "Ubuntu";
+    Assert.assertEquals("emitted tuple", expectedBrowser, flatMap.get("agentinfo.browser"));
+    Assert.assertEquals("emitted tuple", expectedOsName, flatMap.get("agentinfo.os.name"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java b/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
index 2b5a583..2d0595f 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
@@ -32,28 +32,28 @@ import com.datatorrent.lib.util.KeyValPair;
 public class KeyPairToHashMapTest
 {
 
-	/**
-	 * Test oper pass through. The Object passed is not relevant
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Test
-	public void testNodeProcessing() throws Exception
-	{
-		KeyValPairToHashMap oper = new KeyValPairToHashMap();
-		CountTestSink mapSink = new CountTestSink();
-
-		oper.map.setSink(mapSink);
-
-		oper.beginWindow(0);
-		KeyValPair<String, String> input = new KeyValPair<String, String>("a", "1");
-
-		// Same input object can be used as the oper is just pass through
-		int numtuples = 1000;
-		for (int i = 0; i < numtuples; i++) {
-			oper.keyval.process(input);
-		}
-		oper.endWindow();
-
-		Assert.assertEquals("number emitted tuples", numtuples, mapSink.count);
-	}
+  /**
+   * Test oper pass through. The Object passed is not relevant
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    KeyValPairToHashMap oper = new KeyValPairToHashMap();
+    CountTestSink mapSink = new CountTestSink();
+
+    oper.map.setSink(mapSink);
+
+    oper.beginWindow(0);
+    KeyValPair<String, String> input = new KeyValPair<String, String>("a", "1");
+
+    // Same input object can be used as the oper is just pass through
+    int numtuples = 1000;
+    for (int i = 0; i < numtuples; i++) {
+      oper.keyval.process(input);
+    }
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", numtuples, mapSink.count);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java b/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
index c26c8d0..d78e369 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
@@ -33,59 +33,59 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
 public class RoundRobinHashMapTest
 {
 
-	private static Logger log = LoggerFactory
-			.getLogger(RoundRobinHashMapTest.class);
+  private static Logger log = LoggerFactory
+      .getLogger(RoundRobinHashMapTest.class);
 
-	/**
-	 * Test operator pass through. The Object passed is not relevant
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testNodeProcessing() throws Exception
-	{
-		RoundRobinHashMap oper = new RoundRobinHashMap();
-		CollectorTestSink mapSink = new CollectorTestSink();
+  /**
+   * Test operator pass through. The Object passed is not relevant
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    RoundRobinHashMap oper = new RoundRobinHashMap();
+    CollectorTestSink mapSink = new CollectorTestSink();
 
-		String[] keys = new String[3];
-		keys[0] = "a";
-		keys[1] = "b";
-		keys[2] = "c";
+    String[] keys = new String[3];
+    keys[0] = "a";
+    keys[1] = "b";
+    keys[2] = "c";
 
-		oper.setKeys(keys);
-		oper.map.setSink(mapSink);
-		oper.beginWindow(0);
+    oper.setKeys(keys);
+    oper.map.setSink(mapSink);
+    oper.beginWindow(0);
 
-		HashMap<String, Integer> t1 = new HashMap<String, Integer>();
-		t1.put("a", 0);
-		t1.put("b", 1);
-		t1.put("c", 2);
-		HashMap<String, Integer> t2 = new HashMap<String, Integer>();
-		t2.put("a", 3);
-		t2.put("b", 4);
-		t2.put("c", 5);
-		HashMap<String, Integer> t3 = new HashMap<String, Integer>();
-		t3.put("a", 6);
-		t3.put("b", 7);
-		t3.put("c", 8);
+    HashMap<String, Integer> t1 = new HashMap<String, Integer>();
+    t1.put("a", 0);
+    t1.put("b", 1);
+    t1.put("c", 2);
+    HashMap<String, Integer> t2 = new HashMap<String, Integer>();
+    t2.put("a", 3);
+    t2.put("b", 4);
+    t2.put("c", 5);
+    HashMap<String, Integer> t3 = new HashMap<String, Integer>();
+    t3.put("a", 6);
+    t3.put("b", 7);
+    t3.put("c", 8);
 
-		HashMap<String, Integer> t4 = new HashMap<String, Integer>();
-		t4.put("a", 9);
-		t4.put("b", 10);
-		t4.put("c", 11);
+    HashMap<String, Integer> t4 = new HashMap<String, Integer>();
+    t4.put("a", 9);
+    t4.put("b", 10);
+    t4.put("c", 11);
 
-		// Same input object can be used as the oper is just pass through
-		int numtuples = 12;
-		for (int i = 0; i < numtuples; i++) {
-			oper.data.process(i);
-		}
-		oper.endWindow();
+    // Same input object can be used as the oper is just pass through
+    int numtuples = 12;
+    for (int i = 0; i < numtuples; i++) {
+      oper.data.process(i);
+    }
+    oper.endWindow();
 
-		Assert.assertEquals("number emitted tuples", numtuples / 3,
-				mapSink.collectedTuples.size());
-		log.debug(mapSink.collectedTuples.toString());
-		Assert.assertEquals("tuple 1", t1, mapSink.collectedTuples.get(0));
-		Assert.assertEquals("tuple 2", t2, mapSink.collectedTuples.get(1));
-		Assert.assertEquals("tuple 3", t3, mapSink.collectedTuples.get(2));
-		Assert.assertEquals("tuple 4", t4, mapSink.collectedTuples.get(3));
-	}
+    Assert.assertEquals("number emitted tuples", numtuples / 3,
+        mapSink.collectedTuples.size());
+    log.debug(mapSink.collectedTuples.toString());
+    Assert.assertEquals("tuple 1", t1, mapSink.collectedTuples.get(0));
+    Assert.assertEquals("tuple 2", t2, mapSink.collectedTuples.get(1));
+    Assert.assertEquals("tuple 3", t3, mapSink.collectedTuples.get(2));
+    Assert.assertEquals("tuple 4", t4, mapSink.collectedTuples.get(3));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java b/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
index 1c05b6c..7586c70 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
@@ -28,34 +28,35 @@ import com.datatorrent.lib.testbench.CountTestSink;
  * Benchmarks: Currently does about ?? Million tuples/sec in debugging environment. Need to test on larger nodes<br>
  * <br>
  */
-public class StreamDuplicaterTest {
-
-    /**
-     * Test oper pass through. The Object passed is not relevant
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Test
-    public void testNodeProcessing() throws Exception
-    {
-      StreamDuplicater oper = new StreamDuplicater();
-      CountTestSink mergeSink1 = new CountTestSink();
-      CountTestSink mergeSink2 = new CountTestSink();
-
-      oper.out1.setSink(mergeSink1);
-      oper.out2.setSink(mergeSink2);
-
-      oper.beginWindow(0);
-      int numtuples = 1000;
-      Integer input = new Integer(0);
-      // Same input object can be used as the oper is just pass through
-      for (int i = 0; i < numtuples; i++) {
-        oper.data.process(input);
-      }
-
-      oper.endWindow();
-
-      // One for each key
-      Assert.assertEquals("number emitted tuples", numtuples, mergeSink1.count);
-      Assert.assertEquals("number emitted tuples", numtuples, mergeSink2.count);
+public class StreamDuplicaterTest
+{
+
+  /**
+   * Test oper pass through. The Object passed is not relevant
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    StreamDuplicater oper = new StreamDuplicater();
+    CountTestSink mergeSink1 = new CountTestSink();
+    CountTestSink mergeSink2 = new CountTestSink();
+
+    oper.out1.setSink(mergeSink1);
+    oper.out2.setSink(mergeSink2);
+
+    oper.beginWindow(0);
+    int numtuples = 1000;
+    Integer input = 0;
+    // Same input object can be used as the oper is just pass through
+    for (int i = 0; i < numtuples; i++) {
+      oper.data.process(input);
     }
+
+    oper.endWindow();
+
+    // One for each key
+    Assert.assertEquals("number emitted tuples", numtuples, mergeSink1.count);
+    Assert.assertEquals("number emitted tuples", numtuples, mergeSink2.count);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java b/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
index fe626f4..995dc6f 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
@@ -43,7 +43,7 @@ public class StreamMergerTest
 
     oper.beginWindow(0);
     int numtuples = 500;
-    Integer input = new Integer(0);
+    Integer input = 0;
     // Same input object can be used as the oper is just pass through
     for (int i = 0; i < numtuples; i++) {
       oper.data1.process(input);
@@ -51,6 +51,6 @@ public class StreamMergerTest
     }
 
     oper.endWindow();
-    Assert.assertEquals("number emitted tuples", numtuples*2, mergeSink.count);
+    Assert.assertEquals("number emitted tuples", numtuples * 2, mergeSink.count);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
index b0d3c01..1f29d1d 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
@@ -21,8 +21,9 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.datatorrent.lib.streamquery.DeleteOperator;
 import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -31,44 +32,46 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class DeleteOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-	DeleteOperator oper = new DeleteOperator();
-  	
-  	EqualValueCondition  condition = new EqualValueCondition();
-  	condition.addEqualValue("a", 1);
-  	oper.setCondition(condition);
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    DeleteOperator oper = new DeleteOperator();
+
+    EqualValueCondition  condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteOperatorTest.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
index 97d587b..728fb96 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.condition.Condition;
 import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -29,16 +31,16 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
 
 public class FullOuterJoinOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-		OuterJoinOperator oper = new OuterJoinOperator();	
-		oper.setFullJoin(true);
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
+    // create operator
+    OuterJoinOperator oper = new OuterJoinOperator();
+    oper.setFullJoin(true);
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
     // set column join condition  
     Condition cond = new JoinColumnEqualCondition("a", "a");
     oper.setJoinCondition(cond);
@@ -46,43 +48,46 @@ public class FullOuterJoinOperatorTest
     // add columns  
     oper.selectTable1Column(new ColumnIndex("b", null));
     oper.selectTable2Column(new ColumnIndex("c", null));
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 2);
-  	tuple.put("b", 11);
-  	tuple.put("c", 12);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 7);
-  	tuple.put("c", 8);
-  	oper.inport2.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport2.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 11);
+    tuple.put("c", 12);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(FullOuterJoinOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
index 0d4c939..714f93b 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
 import com.datatorrent.lib.streamquery.function.SumFunction;
@@ -32,58 +34,61 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class GroupByOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlGroupBy()
   {
-  	// create operator   
-	  GroupByHavingOperator oper = new GroupByHavingOperator();
-  	oper.addColumnGroupByIndex(new ColumnIndex("b", null));
-  	try {
+    // create operator
+    GroupByHavingOperator oper = new GroupByHavingOperator();
+    oper.addColumnGroupByIndex(new ColumnIndex("b", null));
+    try {
       oper.addAggregateIndex(new SumFunction("c", null));
     } catch (Exception e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
       return;
     }
-  	
-  	EqualValueCondition  condition = new EqualValueCondition();
-  	condition.addEqualValue("a", 1);
-  	oper.setCondition(condition);
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 1);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 2);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
+
+    EqualValueCondition  condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 2);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
     tuple = new HashMap<String, Object>();
     tuple.put("a", 1);
     tuple.put("b", 2);
     tuple.put("c", 7);
     oper.inport.process(tuple);
     
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
index 7ccb2ed..e11723d 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
 import com.datatorrent.lib.streamquery.condition.HavingCompareValue;
@@ -35,57 +37,60 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class HavingOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlGroupBy() throws Exception
   {
-  	// create operator   
-	  GroupByHavingOperator oper = new GroupByHavingOperator();
-  	oper.addColumnGroupByIndex(new ColumnIndex("b", null));
-  	FunctionIndex sum = new SumFunction("c", null);
+    // create operator
+    GroupByHavingOperator oper = new GroupByHavingOperator();
+    oper.addColumnGroupByIndex(new ColumnIndex("b", null));
+    FunctionIndex sum = new SumFunction("c", null);
     oper.addAggregateIndex(sum);
 
     // create having condition
     HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0);
     oper.addHavingCondition(having);
-  	
-  	EqualValueCondition  condition = new EqualValueCondition();
-  	condition.addEqualValue("a", 1);
-  	oper.setCondition(condition);
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 1);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 2);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
+
+    EqualValueCondition  condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 2);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
     tuple = new HashMap<String, Object>();
     tuple.put("a", 1);
     tuple.put("b", 2);
     tuple.put("c", 7);
     oper.inport.process(tuple);
     
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(HavingOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
index 2f14f16..8a022ee 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.condition.Condition;
 import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -34,53 +36,56 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class InnerJoinOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-		InnerJoinOperator oper = new InnerJoinOperator();	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	// set column join condition  
-  	Condition cond = new JoinColumnEqualCondition("a", "a");
-  	oper.setJoinCondition(cond);
-  	
-  	// add columns  
-  	oper.selectTable1Column(new ColumnIndex("b", null));
-  	oper.selectTable2Column(new ColumnIndex("c", null));
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 7);
-  	tuple.put("c", 8);
-  	oper.inport2.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport2.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    InnerJoinOperator oper = new InnerJoinOperator();
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    // set column join condition
+    Condition cond = new JoinColumnEqualCondition("a", "a");
+    oper.setJoinCondition(cond);
+
+    // add columns
+    oper.selectTable1Column(new ColumnIndex("b", null));
+    oper.selectTable2Column(new ColumnIndex("c", null));
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(InnerJoinOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
index 32e5b13..aa25e87 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.condition.Condition;
 import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -29,15 +31,15 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
 
 public class LeftOuterJoinOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-		OuterJoinOperator oper = new OuterJoinOperator();	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
+    // create operator
+    OuterJoinOperator oper = new OuterJoinOperator();
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
     // set column join condition  
     Condition cond = new JoinColumnEqualCondition("a", "a");
     oper.setJoinCondition(cond);
@@ -45,43 +47,46 @@ public class LeftOuterJoinOperatorTest
     // add columns  
     oper.selectTable1Column(new ColumnIndex("b", null));
     oper.selectTable2Column(new ColumnIndex("c", null));
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 2);
-  	tuple.put("b", 11);
-  	tuple.put("c", 12);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 7);
-  	tuple.put("c", 8);
-  	oper.inport2.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport2.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 11);
+    tuple.put("c", 12);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
index b233290..2d7ba87 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -29,60 +31,63 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class OrderByOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// craete operator   
+    // craete operator
     OrderByOperator oper = new OrderByOperator();
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	oper.addOrderByRule(new OrderByRule<Integer>("b"));
-  	oper.setDescending(true);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("c", 2);
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 2);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 2);
-  	tuple.put("b", 6);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 4);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 8);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+    oper.addOrderByRule(new OrderByRule<Integer>("b"));
+    oper.setDescending(true);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("c", 2);
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 6);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 4);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 8);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(OrderByOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
index f99ee25..3a57427 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.condition.Condition;
 import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -29,16 +31,16 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
 
 public class RightOuterJoinOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-		OuterJoinOperator oper = new OuterJoinOperator();	
-		oper.setRighttJoin();
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
+    // create operator
+    OuterJoinOperator oper = new OuterJoinOperator();
+    oper.setRighttJoin();
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
     // set column join condition  
     Condition cond = new JoinColumnEqualCondition("a", "a");
     oper.setJoinCondition(cond);
@@ -46,44 +48,47 @@ public class RightOuterJoinOperatorTest
     // add columns  
     oper.selectTable1Column(new ColumnIndex("b", null));
     oper.selectTable2Column(new ColumnIndex("c", null));
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport1.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport1.process(tuple);
-  	
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 7);
-  	tuple.put("c", 8);
-  	oper.inport2.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport2.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
     tuple.put("a", 2);
     tuple.put("b", 11);
     tuple.put("c", 12);
     oper.inport2.process(tuple);
     
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(RightOuterJoinOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
index 3ac18f8..8e6620e 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
@@ -21,8 +21,9 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.datatorrent.lib.streamquery.SelectOperator;
 import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
 import com.datatorrent.lib.streamquery.index.ColumnIndex;
 import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -32,46 +33,49 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SelectOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectOperator oper = new SelectOperator();
-  	oper.addIndex(new ColumnIndex("b", null));
-  	oper.addIndex(new ColumnIndex("c", null));
-  	
-  	EqualValueCondition  condition = new EqualValueCondition();
-  	condition.addEqualValue("a", 1);
-  	oper.setCondition(condition);
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    EqualValueCondition  condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SelectOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
index 8c894d1..c92c6c1 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -54,7 +56,10 @@ public class SelectTopOperatorTest
     tuple.put("c", 6);
     oper.inport.process(tuple);
     oper.endWindow();
-    
-    System.out.println(sink.collectedTuples.toString());
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SelectTopOperatorTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
index 70713db..42af56b 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
@@ -21,52 +21,56 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.datatorrent.lib.streamquery.UpdateOperator;
 import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
 public class UpdateOperatorTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-		UpdateOperator oper = new UpdateOperator();
-  	
-  	EqualValueCondition  condition = new EqualValueCondition();
-  	condition.addEqualValue("a", 1);
-  	oper.setCondition(condition);
-  	oper.addUpdate("c", 100);
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    UpdateOperator oper = new UpdateOperator();
+
+    EqualValueCondition  condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+    oper.addUpdate("c", 100);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(UpdateOperatorTest.class);
+
 }