You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:41:52 UTC

[02/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
index 0d0ef05..b0500eb 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectOperator;
 import com.datatorrent.lib.streamquery.condition.BetweenCondition;
@@ -32,52 +34,54 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class BetweenConditionTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectOperator oper = new SelectOperator();
-  	oper.addIndex(new ColumnIndex("b", null));
-  	oper.addIndex(new ColumnIndex("c", null));
-  	
-  	BetweenCondition cond = new BetweenCondition("a", 0, 2); 
-  	oper.setCondition(cond);
-  	
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 2);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    BetweenCondition cond = new BetweenCondition("a", 0, 2);
+    oper.setCondition(cond);
+
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
     tuple.put("a", 3);
     tuple.put("b", 7);
     tuple.put("c", 8);
     oper.inport.process(tuple);
     
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(BetweenConditionTest.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/CompoundConditionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/CompoundConditionTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/CompoundConditionTest.java
index 55b36a7..2f92c9b 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/CompoundConditionTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/CompoundConditionTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectOperator;
 import com.datatorrent.lib.streamquery.condition.CompoundCondition;
@@ -33,56 +35,58 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class CompoundConditionTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectOperator oper = new SelectOperator();
-  	oper.addIndex(new ColumnIndex("b", null));
-  	oper.addIndex(new ColumnIndex("c", null));
-  	
-  	EqualValueCondition  left = new EqualValueCondition();
-  	left.addEqualValue("a", 1);
-  	EqualValueCondition  right = new EqualValueCondition();
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    EqualValueCondition  left = new EqualValueCondition();
+    left.addEqualValue("a", 1);
+    EqualValueCondition  right = new EqualValueCondition();
     right.addEqualValue("b", 1);
-  	 
-  	oper.setCondition(new CompoundCondition(left, right));
-  	
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
+
+    oper.setCondition(new CompoundCondition(left, right));
+
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
     tuple.put("a", 3);
     tuple.put("b", 7);
     tuple.put("c", 8);
     oper.inport.process(tuple);
     
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(CompoundConditionTest.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/InConditionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/InConditionTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/InConditionTest.java
index b43b6dd..d235a9c 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/InConditionTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/InConditionTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectOperator;
 import com.datatorrent.lib.streamquery.condition.InCondition;
@@ -32,54 +34,57 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class InConditionTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectOperator oper = new SelectOperator();
-  	oper.addIndex(new ColumnIndex("b", null));
-  	oper.addIndex(new ColumnIndex("c", null));
-  	
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
     InCondition cond = new InCondition("a"); 
     cond.addInValue(0);
     cond.addInValue(1);
-  	oper.setCondition(cond);
-  	
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 2);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
+    oper.setCondition(cond);
+
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
     tuple.put("a", 3);
     tuple.put("b", 7);
     tuple.put("c", 8);
     oper.inport.process(tuple);
     
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(InConditionTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/LikeConditionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/LikeConditionTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/LikeConditionTest.java
index 4d67708..b4d8539 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/LikeConditionTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/LikeConditionTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectOperator;
 import com.datatorrent.lib.streamquery.condition.LikeCondition;
@@ -32,45 +34,48 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class LikeConditionTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectOperator oper = new SelectOperator();
-  	oper.addIndex(new ColumnIndex("b", null));
-  	oper.addIndex(new ColumnIndex("c", null));
-  	
-  	LikeCondition  condition = new LikeCondition("a", "test*");
-  	oper.setCondition(condition);
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", "testing");
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", "null");
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", "testall");
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    LikeCondition  condition = new LikeCondition("a", "test*");
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", "testing");
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", "null");
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", "testall");
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(LikeConditionTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/NegateIndexTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/NegateIndexTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/NegateIndexTest.java
index b81e842..3ccb03e 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/NegateIndexTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/NegateIndexTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectOperator;
 import com.datatorrent.lib.streamquery.index.NegateExpression;
@@ -31,41 +33,43 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class NegateIndexTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectOperator oper = new SelectOperator();
-  	oper.addIndex(new NegateExpression("b", null));
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new NegateExpression("b", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(NegateIndexTest.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectAverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectAverageTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectAverageTest.java
index eac0657..5279dac 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectAverageTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectAverageTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectFunctionOperator;
 import com.datatorrent.lib.streamquery.function.AverageFunction;
@@ -31,41 +33,43 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SelectAverageTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectFunctionOperator oper = new SelectFunctionOperator();
-  	oper.addSqlFunction(new AverageFunction("b", null));
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new AverageFunction("b", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SelectAverageTest.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectCountTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectCountTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectCountTest.java
index 243c946..9c235e1 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectCountTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectCountTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectFunctionOperator;
 import com.datatorrent.lib.streamquery.function.CountFunction;
@@ -31,41 +33,44 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SelectCountTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectFunctionOperator oper = new SelectFunctionOperator();
-  	oper.addSqlFunction(new CountFunction("b", null));
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", null);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", null);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new CountFunction("b", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", null);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", null);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SelectCountTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectFirstLastTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectFirstLastTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectFirstLastTest.java
index fe9ed07..c7b56fe 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectFirstLastTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectFirstLastTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectFunctionOperator;
 import com.datatorrent.lib.streamquery.function.FirstLastFunction;
@@ -31,41 +33,44 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SelectFirstLastTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectFunctionOperator oper = new SelectFunctionOperator();
-  	oper.addSqlFunction(new FirstLastFunction("b", null, false));
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", null);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", null);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new FirstLastFunction("b", null, false));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", null);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", null);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SelectFirstLastTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectMaxMinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectMaxMinTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectMaxMinTest.java
index 6b06848..e57554a 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectMaxMinTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SelectMaxMinTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectFunctionOperator;
 import com.datatorrent.lib.streamquery.function.MaxMinFunction;
@@ -31,41 +33,44 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SelectMaxMinTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectFunctionOperator oper = new SelectFunctionOperator();
-  	oper.addSqlFunction(new MaxMinFunction("b", null, false));
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new MaxMinFunction("b", null, false));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SelectMaxMinTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SumIndexTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SumIndexTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SumIndexTest.java
index c8e17c0..f9f1e10 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SumIndexTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/SumIndexTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery.advanced;
 import java.util.HashMap;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.streamquery.SelectOperator;
 import com.datatorrent.lib.streamquery.index.SumExpression;
@@ -31,41 +33,44 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SumIndexTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
   public void testSqlSelect()
   {
-  	// create operator   
-  	SelectOperator oper = new SelectOperator();
-  	oper.addIndex(new SumExpression("b", "c", null));
-  	
-  	CollectorTestSink sink = new CollectorTestSink();
-  	oper.outport.setSink(sink);
-  	
-  	oper.setup(null);
-  	oper.beginWindow(1);
-  	
-  	HashMap<String, Object> tuple = new HashMap<String, Object>();
-  	tuple.put("a", 0);
-  	tuple.put("b", 1);
-  	tuple.put("c", 2);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 3);
-  	tuple.put("c", 4);
-  	oper.inport.process(tuple);
-  	
-  	tuple = new HashMap<String, Object>();
-  	tuple.put("a", 1);
-  	tuple.put("b", 5);
-  	tuple.put("c", 6);
-  	oper.inport.process(tuple);
-  	
-  	oper.endWindow();
-  	oper.teardown();
-  	
-  	System.out.println(sink.collectedTuples.toString());
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new SumExpression("b", "c", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
   }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SumIndexTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/ActiveMQMessageGenerator.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/ActiveMQMessageGenerator.java b/library/src/test/java/com/datatorrent/lib/testbench/ActiveMQMessageGenerator.java
index 75ca86d..4f231cb 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/ActiveMQMessageGenerator.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/ActiveMQMessageGenerator.java
@@ -27,10 +27,11 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
+
 /**
  * This is the message generator outside of Malhar/Hadoop. This generates data
  * and send to ActiveMQ message bus so that Malhar input adapter for ActiveMQ
@@ -39,109 +40,109 @@ import org.slf4j.LoggerFactory;
  */
 public class ActiveMQMessageGenerator
 {
-	private static final Logger logger = LoggerFactory
-			.getLogger(ActiveMQMessageGenerator.class);
-	private Connection connection;
-	private Session session;
-	private Destination destination;
-	private MessageProducer producer;
-	public HashMap<Integer, String> sendData = new HashMap<Integer, String>();
-	public int sendCount = 0;
-	private int debugMessageCount = 0;
-	private String user = "";
-	private String password = "";
-	private String url = "tcp://localhost:61617";
-	private int ackMode = Session.CLIENT_ACKNOWLEDGE;
-	private String subject = "TEST.FOO";
-	private int messageSize = 255;
-	private long maximumSendMessages = 20; // 0 means unlimitted, this has to run
-																				 // in seperate thread for unlimitted
-	private boolean topic = false;
-	private boolean transacted = false;
-	private boolean verbose = false;
-
-	public ActiveMQMessageGenerator()
-	{
-	}
-
-	public void setDebugMessageCount(int count)
-	{
-		debugMessageCount = count;
-	}
-
-	/**
-	 * Setup connection, producer, consumer so on.
-	 * 
-	 * @throws JMSException
-	 */
-	public void setupConnection() throws JMSException
-	{
-		// Create connection
-		ActiveMQConnectionFactory connectionFactory;
-		connectionFactory = new ActiveMQConnectionFactory(user, password, url);
-
-		connection = connectionFactory.createConnection();
-		connection.start();
-
-		// Create session
-		session = connection.createSession(transacted, ackMode);
-
-		// Create destination
-		destination = topic ? session.createTopic(subject) : session
-				.createQueue(subject);
-
-		// Create producer
-		producer = session.createProducer(destination);
-	}
-
-	/**
-	 * Generate message and send it to ActiveMQ message bus.
-	 * 
-	 * @throws Exception
-	 */
-	public void sendMessage() throws Exception
-	{
-		for (int i = 1; i <= maximumSendMessages || maximumSendMessages == 0; i++) {
-
-			// Silly message
-			String myMsg = "My TestMessage " + i;
-			// String myMsg = "My TestMessage " + i + " sent at " + new Date();
-
-			if (myMsg.length() > messageSize) {
-				myMsg = myMsg.substring(0, messageSize);
-			}
-
-			TextMessage message = session.createTextMessage(myMsg);
-
-			producer.send(message);
-			// store it for testing later
-			sendData.put(i, myMsg);
-			sendCount++;
-
-			if (verbose) {
-				String msg = message.getText();
-				if (msg.length() > messageSize) {
-					msg = msg.substring(0, messageSize) + "...";
-				}
-				if (i <= debugMessageCount) {
-					System.out.println("[" + this + "] Sending message from generator: '"
-							+ msg + "'");
-				}
-			}
-		}
-	}
-
-	/**
-	 * Close connection resources.
-	 */
-	public void closeConnection()
-	{
-		try {
-			producer.close();
-			session.close();
-			connection.close();
-		} catch (JMSException ex) {
-			logger.debug(ex.getLocalizedMessage());
-		}
-	}
+  private static final Logger logger = LoggerFactory
+      .getLogger(ActiveMQMessageGenerator.class);
+  private Connection connection;
+  private Session session;
+  private Destination destination;
+  private MessageProducer producer;
+  public HashMap<Integer, String> sendData = new HashMap<Integer, String>();
+  public int sendCount = 0;
+  private int debugMessageCount = 0;
+  private String user = "";
+  private String password = "";
+  private String url = "tcp://localhost:61617";
+  private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+  private String subject = "TEST.FOO";
+  private int messageSize = 255;
+  private long maximumSendMessages = 20; // 0 means unlimitted, this has to run
+                                         // in seperate thread for unlimitted
+  private boolean topic = false;
+  private boolean transacted = false;
+  private boolean verbose = false;
+
+  public ActiveMQMessageGenerator()
+  {
+  }
+
+  public void setDebugMessageCount(int count)
+  {
+    debugMessageCount = count;
+  }
+
+  /**
+   * Setup connection, producer, consumer so on.
+   *
+   * @throws JMSException
+   */
+  public void setupConnection() throws JMSException
+  {
+    // Create connection
+    ActiveMQConnectionFactory connectionFactory;
+    connectionFactory = new ActiveMQConnectionFactory(user, password, url);
+
+    connection = connectionFactory.createConnection();
+    connection.start();
+
+    // Create session
+    session = connection.createSession(transacted, ackMode);
+
+    // Create destination
+    destination = topic ? session.createTopic(subject) : session
+        .createQueue(subject);
+
+    // Create producer
+    producer = session.createProducer(destination);
+  }
+
+  /**
+   * Generate message and send it to ActiveMQ message bus.
+   *
+   * @throws Exception
+   */
+  public void sendMessage() throws Exception
+  {
+    for (int i = 1; i <= maximumSendMessages || maximumSendMessages == 0; i++) {
+
+      // Silly message
+      String myMsg = "My TestMessage " + i;
+      // String myMsg = "My TestMessage " + i + " sent at " + new Date();
+
+      if (myMsg.length() > messageSize) {
+        myMsg = myMsg.substring(0, messageSize);
+      }
+
+      TextMessage message = session.createTextMessage(myMsg);
+
+      producer.send(message);
+      // store it for testing later
+      sendData.put(i, myMsg);
+      sendCount++;
+
+      if (verbose) {
+        String msg = message.getText();
+        if (msg.length() > messageSize) {
+          msg = msg.substring(0, messageSize) + "...";
+        }
+        if (i <= debugMessageCount) {
+          System.out.println("[" + this + "] Sending message from generator: '"
+              + msg + "'");
+        }
+      }
+    }
+  }
+
+  /**
+   * Close connection resources.
+   */
+  public void closeConnection()
+  {
+    try {
+      producer.close();
+      session.close();
+      connection.close();
+    } catch (JMSException ex) {
+      logger.debug(ex.getLocalizedMessage());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/EventClassifierTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/EventClassifierTest.java b/library/src/test/java/com/datatorrent/lib/testbench/EventClassifierTest.java
index 2eb1219..e9b4873 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/EventClassifierTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/EventClassifierTest.java
@@ -40,217 +40,217 @@ import com.datatorrent.api.Sink;
  * <br>
  * Validates all DRC checks of the node<br>
  */
-public class EventClassifierTest {
+public class EventClassifierTest
+{
 
-    private static Logger LOG = LoggerFactory.getLogger(EventClassifier.class);
+  private static Logger LOG = LoggerFactory.getLogger(EventClassifier.class);
 
-    @SuppressWarnings("rawtypes")
-    class TestSink implements Sink {
+  @SuppressWarnings("rawtypes")
+  class TestSink implements Sink
+  {
 
-        HashMap<String, Integer> collectedTuples = new HashMap<String, Integer>();
-        HashMap<String, Double> collectedTupleValues = new HashMap<String, Double>();
+    HashMap<String, Integer> collectedTuples = new HashMap<String, Integer>();
+    HashMap<String, Double> collectedTupleValues = new HashMap<String, Double>();
 
-        int count = 0;
-        boolean dohash = true;
+    int count = 0;
+    boolean dohash = true;
 
-        /**
-         *
-         * @param payload
-         */
-        @SuppressWarnings("unchecked")
-        @Override
-        public void put(Object payload) {
-          count++;
-          if (dohash) {
-            HashMap<String, Double> tuple = (HashMap<String, Double>)payload;
-            for (Map.Entry<String, Double> e : tuple.entrySet()) {
-              Integer ival = collectedTuples.get(e.getKey());
-              if (ival == null) {
-                ival = new Integer(1);
-              }
-              else {
-                ival = ival + 1;
-              }
-              collectedTuples.put(e.getKey(), ival);
-              collectedTupleValues.put(e.getKey(), e.getValue());
-            }
+    /**
+     *
+     * @param payload
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void put(Object payload)
+    {
+      count++;
+      if (dohash) {
+        HashMap<String, Double> tuple = (HashMap<String, Double>)payload;
+        for (Map.Entry<String, Double> e : tuple.entrySet()) {
+          Integer ival = collectedTuples.get(e.getKey());
+          if (ival == null) {
+            ival = 1;
+          } else {
+            ival = ival + 1;
           }
+          collectedTuples.put(e.getKey(), ival);
+          collectedTupleValues.put(e.getKey(), e.getValue());
         }
-        /**
-         *
-         */
-        public void clear() {
-            collectedTuples.clear();
-            collectedTupleValues.clear();
-            count = 0;
-        }
+      }
+    }
+
+    /**
+     *
+     */
+    public void clear()
+    {
+      collectedTuples.clear();
+      collectedTupleValues.clear();
+      count = 0;
+    }
 
     @Override
     public int getCount(boolean reset)
     {
       throw new UnsupportedOperationException("Not supported yet.");
     }
-    }
+  }
 
-    /**
-     * Test node logic emits correct results
-     */
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testNodeProcessing() throws Exception
-    {
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
 
-      EventClassifier node = new EventClassifier();
-      TestSink classifySink = new TestSink();
-      classifySink.dohash = true;
-      node.data.setSink(classifySink);
+    EventClassifier node = new EventClassifier();
+    TestSink classifySink = new TestSink();
+    classifySink.dohash = true;
+    node.data.setSink(classifySink);
 
-      HashMap<String, Double> keymap = new HashMap<String, Double>();
-      keymap.put("a", 1.0);
-      keymap.put("b", 4.0);
-      keymap.put("c", 5.0);
-      node.setKeyMap(keymap);
-      node.setOperationReplace();
+    HashMap<String, Double> keymap = new HashMap<String, Double>();
+    keymap.put("a", 1.0);
+    keymap.put("b", 4.0);
+    keymap.put("c", 5.0);
+    node.setKeyMap(keymap);
+    node.setOperationReplace();
 
-      int numTuples = 1000;
+    int numTuples = 1000;
 
-      HashMap<String, ArrayList<Integer>> wmap = new HashMap<String, ArrayList<Integer>>();
-      ArrayList<Integer> list = new ArrayList<Integer>(3);
-      list.add(60);
-      list.add(10);
-      list.add(35);
-      wmap.put("ia", list);
-      list = new ArrayList<Integer>(3);
-      list.add(10);
-      list.add(75);
-      list.add(15);
-      wmap.put("ib", list);
-      list = new ArrayList<Integer>(3);
-      list.add(20);
-      list.add(10);
-      list.add(70);
-      wmap.put("ic", list);
-      list = new ArrayList<Integer>(3);
-      list.add(50);
-      list.add(15);
-      list.add(35);
-      wmap.put("id", list);
-      node.setKeyWeights(wmap);
-      node.setup(null);
+    HashMap<String, ArrayList<Integer>> wmap = new HashMap<String, ArrayList<Integer>>();
+    ArrayList<Integer> list = new ArrayList<Integer>(3);
+    list.add(60);
+    list.add(10);
+    list.add(35);
+    wmap.put("ia", list);
+    list = new ArrayList<Integer>(3);
+    list.add(10);
+    list.add(75);
+    list.add(15);
+    wmap.put("ib", list);
+    list = new ArrayList<Integer>(3);
+    list.add(20);
+    list.add(10);
+    list.add(70);
+    wmap.put("ic", list);
+    list = new ArrayList<Integer>(3);
+    list.add(50);
+    list.add(15);
+    list.add(35);
+    wmap.put("id", list);
+    node.setKeyWeights(wmap);
+    node.setup(null);
 
-      HashMap<String, Double> input = new HashMap<String, Double>();
-      int sentval = 0;
-      for (int i = 0; i < numTuples; i++) {
-        input.clear();
-        input.put("ia", 2.0);
-        input.put("ib", 20.0);
-        input.put("ic", 1000.0);
-        input.put("id", 1000.0);
-        sentval += 4;
-        node.event.process(input);
-      }
-      node.endWindow();
-      int ival = 0;
-      if (classifySink.dohash) {
-        for (Map.Entry<String, Integer> e: classifySink.collectedTuples.entrySet()) {
-          ival += e.getValue().intValue();
-        }
-      }
-      else {
-        ival = classifySink.count;
+    HashMap<String, Double> input = new HashMap<String, Double>();
+    int sentval = 0;
+    for (int i = 0; i < numTuples; i++) {
+      input.clear();
+      input.put("ia", 2.0);
+      input.put("ib", 20.0);
+      input.put("ic", 1000.0);
+      input.put("id", 1000.0);
+      sentval += 4;
+      node.event.process(input);
+    }
+    node.endWindow();
+    int ival = 0;
+    if (classifySink.dohash) {
+      for (Map.Entry<String, Integer> e : classifySink.collectedTuples.entrySet()) {
+        ival += e.getValue().intValue();
       }
+    } else {
+      ival = classifySink.count;
+    }
 
-      LOG.info(String.format("\nThe number of keys in %d tuples are %d and %d",
-                             ival,
-                             classifySink.collectedTuples.size(),
-                             classifySink.collectedTupleValues.size()));
-      for (Map.Entry<String, Double> ve: classifySink.collectedTupleValues.entrySet()) {
-        Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
-        LOG.info(String.format("%d tuples of key \"%s\" has value %f", ieval.intValue(), ve.getKey(), ve.getValue()));
-      }
-      Assert.assertEquals("number emitted tuples", sentval, ival);
+    LOG.info(String.format("\nThe number of keys in %d tuples are %d and %d",
+        ival,
+        classifySink.collectedTuples.size(),
+        classifySink.collectedTupleValues.size()));
+    for (Map.Entry<String, Double> ve : classifySink.collectedTupleValues.entrySet()) {
+      Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
+      LOG.info(String.format("%d tuples of key \"%s\" has value %f", ieval.intValue(), ve.getKey(), ve.getValue()));
+    }
+    Assert.assertEquals("number emitted tuples", sentval, ival);
 
-      // Now test a node with no weights
-      EventClassifier nwnode = new EventClassifier();
-      classifySink.clear();
-      nwnode.data.setSink(classifySink);
-      nwnode.setKeyMap(keymap);
-      nwnode.setOperationReplace();
-      nwnode.setup(null);
+    // Now test a node with no weights
+    EventClassifier nwnode = new EventClassifier();
+    classifySink.clear();
+    nwnode.data.setSink(classifySink);
+    nwnode.setKeyMap(keymap);
+    nwnode.setOperationReplace();
+    nwnode.setup(null);
 
-      sentval = 0;
-      for (int i = 0; i < numTuples; i++) {
-        input.clear();
-        input.put("ia", 2.0);
-        input.put("ib", 20.0);
-        input.put("ic", 1000.0);
-        input.put("id", 1000.0);
-        sentval += 4;
-        nwnode.event.process(input);
-      }
-      nwnode.endWindow();
-      ival = 0;
-      if (classifySink.dohash) {
-        for (Map.Entry<String, Integer> e: classifySink.collectedTuples.entrySet()) {
-          ival += e.getValue().intValue();
-        }
-      }
-      else {
-        ival = classifySink.count;
-      }
-      LOG.info(String.format("\nThe number of keys in %d tuples are %d and %d",
-                             ival,
-                             classifySink.collectedTuples.size(),
-                             classifySink.collectedTupleValues.size()));
-      for (Map.Entry<String, Double> ve: classifySink.collectedTupleValues.entrySet()) {
-        Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
-        LOG.info(String.format("%d tuples of key \"%s\" has value %f", ieval.intValue(), ve.getKey(), ve.getValue()));
+    sentval = 0;
+    for (int i = 0; i < numTuples; i++) {
+      input.clear();
+      input.put("ia", 2.0);
+      input.put("ib", 20.0);
+      input.put("ic", 1000.0);
+      input.put("id", 1000.0);
+      sentval += 4;
+      nwnode.event.process(input);
+    }
+    nwnode.endWindow();
+    ival = 0;
+    if (classifySink.dohash) {
+      for (Map.Entry<String, Integer> e : classifySink.collectedTuples.entrySet()) {
+        ival += e.getValue().intValue();
       }
-      Assert.assertEquals("number emitted tuples", sentval, ival);
-
+    } else {
+      ival = classifySink.count;
+    }
+    LOG.info(String.format("\nThe number of keys in %d tuples are %d and %d",
+        ival,
+        classifySink.collectedTuples.size(),
+        classifySink.collectedTupleValues.size()));
+    for (Map.Entry<String, Double> ve : classifySink.collectedTupleValues.entrySet()) {
+      Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
+      LOG.info(String.format("%d tuples of key \"%s\" has value %f", ieval.intValue(), ve.getKey(), ve.getValue()));
+    }
+    Assert.assertEquals("number emitted tuples", sentval, ival);
 
-      // Now test a node with no weights and no values
-      EventClassifier nvnode = new EventClassifier();
-      classifySink.clear();
-      keymap.put("a", 0.0);
-      keymap.put("b", 0.0);
-      keymap.put("c", 0.0);
+    // Now test a node with no weights and no values
+    EventClassifier nvnode = new EventClassifier();
+    classifySink.clear();
+    keymap.put("a", 0.0);
+    keymap.put("b", 0.0);
+    keymap.put("c", 0.0);
 
-      nvnode.data.setSink(classifySink);
-      nvnode.setKeyMap(keymap);
-      nvnode.setOperationReplace();
-      nvnode.setup(null);
+    nvnode.data.setSink(classifySink);
+    nvnode.setKeyMap(keymap);
+    nvnode.setOperationReplace();
+    nvnode.setup(null);
 
-      sentval = 0;
-      for (int i = 0; i < numTuples; i++) {
-        input.clear();
-        input.put("ia", 2.0);
-        input.put("ib", 20.0);
-        input.put("ic", 500.0);
-        input.put("id", 1000.0);
-        sentval += 4;
-        nvnode.event.process(input);
-      }
+    sentval = 0;
+    for (int i = 0; i < numTuples; i++) {
+      input.clear();
+      input.put("ia", 2.0);
+      input.put("ib", 20.0);
+      input.put("ic", 500.0);
+      input.put("id", 1000.0);
+      sentval += 4;
+      nvnode.event.process(input);
+    }
     nvnode.endWindow();
     ival = 0;
     if (classifySink.dohash) {
-      for (Map.Entry<String, Integer> e: classifySink.collectedTuples.entrySet()) {
+      for (Map.Entry<String, Integer> e : classifySink.collectedTuples.entrySet()) {
         ival += e.getValue().intValue();
       }
-    }
-    else {
+    } else {
       ival = classifySink.count;
     }
     LOG.info(String.format("\nThe number of keys in %d tuples are %d and %d",
-                           ival,
-                           classifySink.collectedTuples.size(),
-                           classifySink.collectedTupleValues.size()));
-    for (Map.Entry<String, Double> ve: classifySink.collectedTupleValues.entrySet()) {
+        ival,
+        classifySink.collectedTuples.size(),
+        classifySink.collectedTupleValues.size()));
+    for (Map.Entry<String, Double> ve : classifySink.collectedTupleValues.entrySet()) {
       Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
       LOG.info(String.format("%d tuples of key \"%s\" has value %f",
-                             ieval.intValue(),
-                             ve.getKey(),
-                             ve.getValue()));
+          ieval.intValue(),
+          ve.getKey(),
+          ve.getValue()));
     }
     Assert.assertEquals("number emitted tuples", sentval, ival);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/EventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/EventGeneratorTest.java b/library/src/test/java/com/datatorrent/lib/testbench/EventGeneratorTest.java
index f064845..47e209c 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/EventGeneratorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/EventGeneratorTest.java
@@ -18,14 +18,16 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-
 import java.util.HashMap;
 import java.util.Map;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -93,26 +95,26 @@ public class EventGeneratorTest
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
-	public void testSingleSchemaNodeProcessing(boolean stringschema) throws Exception
+  public void testSingleSchemaNodeProcessing(boolean stringschema) throws Exception
   {
-  	EventGenerator node = new EventGenerator();
+    EventGenerator node = new EventGenerator();
     node.setKeysHelper("a,b,c,d");
     node.setValuesHelper("");
     node.setWeightsHelper("10,40,20,30");
-  	CollectorTestSink count = new CollectorTestSink();
-  	node.count.setSink(count);
-  	CollectorTestSink data = new CollectorTestSink();
-  	node.string_data.setSink(data);
-  	CollectorTestSink hashData = new CollectorTestSink();
-  	node.hash_data.setSink(hashData);
+    CollectorTestSink count = new CollectorTestSink();
+    node.count.setSink(count);
+    CollectorTestSink data = new CollectorTestSink();
+    node.string_data.setSink(data);
+    CollectorTestSink hashData = new CollectorTestSink();
+    node.hash_data.setSink(hashData);
 
-  	node.setup(null);
-  	node.beginWindow(1);
-  	node.emitTuples();
-  	node.endWindow();
-  	node.teardown();
+    node.setup(null);
+    node.beginWindow(1);
+    node.emitTuples();
+    node.endWindow();
+    node.teardown();
 
-  	assertTrue("Default number of tuples generated", 10000 == data.collectedTuples.size());
+    assertTrue("Default number of tuples generated", 10000 == data.collectedTuples.size());
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/EventIncrementerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/EventIncrementerTest.java b/library/src/test/java/com/datatorrent/lib/testbench/EventIncrementerTest.java
index 1c120f3..0e9fc4d 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/EventIncrementerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/EventIncrementerTest.java
@@ -18,15 +18,16 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.api.Sink;
-import com.datatorrent.lib.testbench.EventIncrementer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Sink;
+
 /**
  * Functional test for {@link com.datatorrent.lib.testbench.EventIncrementer}<p>
  * <br>
@@ -139,19 +140,20 @@ public class EventIncrementerTest
     int numtuples = 1000;
     String seed1 = "a";
     ArrayList val = new ArrayList();
-    val.add(new Integer(10));
-    val.add(new Integer(20));
+    val.add(10);
+    val.add(20);
     stuple.put(seed1, val);
     for (int i = 0; i < numtuples; i++) {
       seedSink.put(stuple);
     }
     oper.endWindow();
 
-    LOG.debug(String.format("\n*************************\nEmitted %d tuples, Processed %d tuples, Received %d tuples\n******************\n",
-                            numtuples,
-                            oper.tuple_count,
-                            dataSink.count));
-    for (Map.Entry<String, String> e: dataSink.collectedTuples.entrySet()) {
+    LOG.debug(String.format(
+        "\n*************************\nEmitted %d tuples, Processed %d tuples, Received %d tuples\n******************\n",
+        numtuples,
+        oper.tuple_count,
+        dataSink.count));
+    for (Map.Entry<String, String> e : dataSink.collectedTuples.entrySet()) {
       LOG.debug(String.format("Got key (%s) and value (%s)", e.getKey(), e.getValue()));
     }
 
@@ -159,12 +161,12 @@ public class EventIncrementerTest
 
     HashMap<String, Object> ixtuple = new HashMap<String, Object>(1);
     HashMap<String, Integer> ixval = new HashMap<String, Integer>(1);
-    ixval.put("x", new Integer(10));
+    ixval.put("x", 10);
     ixtuple.put("a", ixval);
 
     HashMap<String, Object> iytuple = new HashMap<String, Object>(1);
     HashMap<String, Integer> iyval = new HashMap<String, Integer>(1);
-    iyval.put("y", new Integer(10));
+    iyval.put("y", 10);
     iytuple.put("a", iyval);
 
     for (int i = 0; i < numtuples; i++) {
@@ -174,11 +176,12 @@ public class EventIncrementerTest
 
     oper.endWindow();
 
-    LOG.debug(String.format("\n*************************\nEmitted %d tuples, Processed %d tuples, Received %d tuples\n******************\n",
-                            numtuples*2,
-                            oper.tuple_count,
-                            countSink.count));
-     for (Map.Entry<String, String> e: dataSink.collectedTuples.entrySet()) {
+    LOG.debug(String.format(
+        "\n*************************\nEmitted %d tuples, Processed %d tuples, Received %d tuples\n******************\n",
+        numtuples * 2,
+        oper.tuple_count,
+        countSink.count));
+    for (Map.Entry<String, String> e : dataSink.collectedTuples.entrySet()) {
       LOG.debug(String.format("Got key (%s) and value (%s)", e.getKey(), e.getValue()));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/FilteredEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/FilteredEventClassifierTest.java b/library/src/test/java/com/datatorrent/lib/testbench/FilteredEventClassifierTest.java
index 745c515..57c8829 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/FilteredEventClassifierTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/FilteredEventClassifierTest.java
@@ -18,15 +18,16 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.api.Sink;
-import com.datatorrent.lib.testbench.FilteredEventClassifier;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Sink;
+
 
 /**
  * Functional test for {@link com.datatorrent.lib.testbench.FilteredEventClassifier} for three configuration><p>
@@ -61,9 +62,8 @@ public class FilteredEventClassifierTest
       for (Map.Entry<String, Double> e : tuple.entrySet()) {
         Integer ival = collectedTuples.get(e.getKey());
         if (ival == null) {
-          ival = new Integer(1);
-        }
-        else {
+          ival = 1;
+        } else {
           ival = ival + 1;
         }
         collectedTuples.put(e.getKey(), ival);
@@ -162,14 +162,15 @@ public class FilteredEventClassifierTest
       ival += e.getValue().intValue();
     }
 
-    LOG.info(String.format("\n*******************************************************\nFiltered %d out of %d intuples with %d and %d unique keys",
-                           ival,
-                           sentval,
-                           classifySink.collectedTuples.size(),
-                           classifySink.collectedTupleValues.size()));
-    for (Map.Entry<String, Double> ve: classifySink.collectedTupleValues.entrySet()) {
+    LOG.info(String.format(
+        "\n*******************************************************\nFiltered %d out of %d intuples with %d and %d " + "unique keys",
+        ival,
+        sentval,
+        classifySink.collectedTuples.size(),
+        classifySink.collectedTupleValues.size()));
+    for (Map.Entry<String, Double> ve : classifySink.collectedTupleValues.entrySet()) {
       Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
-      LOG.info(String.format("%d tuples of key \"%s\" has value %f", ieval.intValue(), ve.getKey(), ve.getValue()));
+      LOG.info(String.format("%d tuples of key \"%s\" has value %f", ieval, ve.getKey(), ve.getValue()));
     }
 
     // Now test a node with no weights
@@ -207,12 +208,13 @@ public class FilteredEventClassifierTest
       ival += e.getValue().intValue();
     }
 
-    LOG.info(String.format("\n*******************************************************\nFiltered %d out of %d intuples with %d and %d unique keys",
-                           ival,
-                           sentval,
-                           classifySink.collectedTuples.size(),
-                           classifySink.collectedTupleValues.size()));
-    for (Map.Entry<String, Double> ve: classifySink.collectedTupleValues.entrySet()) {
+    LOG.info(String.format(
+        "\n*******************************************************\nFiltered %d out of %d intuples with %d and %d " + "unique keys",
+        ival,
+        sentval,
+        classifySink.collectedTuples.size(),
+        classifySink.collectedTupleValues.size()));
+    for (Map.Entry<String, Double> ve : classifySink.collectedTupleValues.entrySet()) {
       Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
       LOG.info(String.format("%d tuples of key \"%s\" has value %f", ieval.intValue(), ve.getKey(), ve.getValue()));
     }
@@ -252,21 +254,22 @@ public class FilteredEventClassifierTest
     }
     nvnode.endWindow();
     ival = 0;
-    for (Map.Entry<String, Integer> e: classifySink.collectedTuples.entrySet()) {
-      ival += e.getValue().intValue();
+    for (Map.Entry<String, Integer> e : classifySink.collectedTuples.entrySet()) {
+      ival += e.getValue();
     }
-    LOG.info(String.format("\n*******************************************************\nFiltered %d out of %d intuples with %d and %d unique keys",
-                           ival,
-                           sentval,
-                           classifySink.collectedTuples.size(),
-                           classifySink.collectedTupleValues.size()));
+    LOG.info(String.format(
+        "\n*******************************************************\nFiltered %d out of %d intuples with %d and %d " + "unique keys",
+        ival,
+        sentval,
+        classifySink.collectedTuples.size(),
+        classifySink.collectedTupleValues.size()));
 
-    for (Map.Entry<String, Double> ve: classifySink.collectedTupleValues.entrySet()) {
+    for (Map.Entry<String, Double> ve : classifySink.collectedTupleValues.entrySet()) {
       Integer ieval = classifySink.collectedTuples.get(ve.getKey()); // ieval should not be null?
       LOG.info(String.format("%d tuples of key \"%s\" has value %f",
-                             ieval.intValue(),
-                             ve.getKey(),
-                             ve.getValue()));
+          ieval,
+          ve.getKey(),
+          ve.getValue()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/RandomEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/RandomEventGeneratorTest.java b/library/src/test/java/com/datatorrent/lib/testbench/RandomEventGeneratorTest.java
index 5c692c4..6876525 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/RandomEventGeneratorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/RandomEventGeneratorTest.java
@@ -65,8 +65,8 @@ public class RandomEventGeneratorTest
     testSchemaNodeProcessing();
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-	public void testSchemaNodeProcessing() throws Exception
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testSchemaNodeProcessing() throws Exception
   {
     RandomEventGenerator node = new RandomEventGenerator();
     node.setMinvalue(0);
@@ -82,7 +82,7 @@ public class RandomEventGeneratorTest
     node.emitTuples();
     node.endWindow();
     node.teardown();
-    assertTrue("tuple blast" , integer_data.collectedTuples.size() == 5000);
-    assertTrue("tuple blast" , string_data.collectedTuples.size() == 5000);
+    assertTrue("tuple blast", integer_data.collectedTuples.size() == 5000);
+    assertTrue("tuple blast", string_data.collectedTuples.size() == 5000);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/SeedEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/SeedEventClassifierTest.java b/library/src/test/java/com/datatorrent/lib/testbench/SeedEventClassifierTest.java
index eb3c961..dd8d346 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/SeedEventClassifierTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/SeedEventClassifierTest.java
@@ -99,8 +99,7 @@ public class SeedEventClassifierTest
     Sink inSink2 = oper.data2.getSink();
     if (isstring) {
       oper.string_data.setSink(classifySink);
-    }
-    else {
+    } else {
       oper.hash_data.setSink(hashSink);
     }
 
@@ -122,12 +121,11 @@ public class SeedEventClassifierTest
           inSink2.put(input);
         }
       }
-    }
-    else {
+    } else {
       Integer input;
       for (int j = 0; j < 5; j++) {
         for (int i = 0; i < numTuples; i++) {
-          input = new Integer(i);
+          input = i;
           inSink1.put(input);
           inSink2.put(input);
         }
@@ -137,14 +135,13 @@ public class SeedEventClassifierTest
     if (isstring) {
       Assert.assertEquals("number emitted tuples", numTuples * 2 * 5, classifySink.count);
       LOG.debug(String.format("\n********************\nProcessed %d tuples with %d uniques\n********************\n",
-                              classifySink.count,
-                              classifySink.collectedTuples.size()));
-    }
-    else {
+          classifySink.count,
+          classifySink.collectedTuples.size()));
+    } else {
       Assert.assertEquals("number emitted tuples", numTuples * 2 * 5, hashSink.count);
       LOG.debug(String.format("\n********************\nProcessed %d tuples with %d uniques\n********************\n",
-                              hashSink.count,
-                              hashSink.collectedTuples.size()));
+          hashSink.count,
+          hashSink.collectedTuples.size()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/SeedEventGeneratorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/SeedEventGeneratorTest.java b/library/src/test/java/com/datatorrent/lib/testbench/SeedEventGeneratorTest.java
index 80c75d6..cdafecf 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/SeedEventGeneratorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/SeedEventGeneratorTest.java
@@ -18,11 +18,10 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.api.Operator.ShutdownException;
-import com.datatorrent.lib.testbench.SeedEventGenerator;
-
 import org.junit.Test;
 
+import com.datatorrent.api.Operator.ShutdownException;
+
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -50,7 +49,7 @@ public class SeedEventGeneratorTest
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
-	public void testSchemaNodeProcessing(boolean doseedkey) throws Exception
+  public void testSchemaNodeProcessing(boolean doseedkey) throws Exception
   {
     SeedEventGenerator node = new SeedEventGenerator();
     if (doseedkey) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/testbench/ThroughputCounterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/testbench/ThroughputCounterTest.java b/library/src/test/java/com/datatorrent/lib/testbench/ThroughputCounterTest.java
index 8fb8a3e..2becda7 100644
--- a/library/src/test/java/com/datatorrent/lib/testbench/ThroughputCounterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/testbench/ThroughputCounterTest.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.api.Sink;
-import com.datatorrent.lib.testbench.ThroughputCounter;
 import java.util.HashMap;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Sink;
+
 /**
  *
  * Functional tests for {@link com.datatorrent.lib.testbench.ThroughputCounter}. <p>
@@ -38,9 +39,10 @@ import org.slf4j.LoggerFactory;
  * DRC checks are validated<br>
  *
  */
-public class ThroughputCounterTest {
+public class ThroughputCounterTest
+{
 
-    private static Logger log = LoggerFactory.getLogger(ThroughputCounterTest.class);
+  private static Logger log = LoggerFactory.getLogger(ThroughputCounterTest.class);
 
   @SuppressWarnings("rawtypes")
   class TestCountSink implements Sink
@@ -56,8 +58,8 @@ public class ThroughputCounterTest {
     public void put(Object payload)
     {
       HashMap<String, Number> tuples = (HashMap<String, Number>)payload;
-      average = ((Long)tuples.get(ThroughputCounter.OPORT_COUNT_TUPLE_AVERAGE)).longValue();
-      count += ((Long)tuples.get(ThroughputCounter.OPORT_COUNT_TUPLE_COUNT)).longValue();
+      average = (Long)tuples.get(ThroughputCounter.OPORT_COUNT_TUPLE_AVERAGE);
+      count += (Long)tuples.get(ThroughputCounter.OPORT_COUNT_TUPLE_COUNT);
     }
 
     @Override
@@ -85,8 +87,8 @@ public class ThroughputCounterTest {
     HashMap<String, Integer> input;
     int aint = 1000;
     int bint = 100;
-    Integer aval = new Integer(aint);
-    Integer bval = new Integer(bint);
+    Integer aval = aint;
+    Integer bval = bint;
     long ntot = aint + bint;
     long numtuples = 1000;
     long sentval = 0;
@@ -99,10 +101,11 @@ public class ThroughputCounterTest {
     }
     node.endWindow();
 
-    log.info(String.format("\n*******************************************************\nGot average per sec(%d), count(got %d, expected %d), numtuples(%d)",
-                           countSink.average,
-                           countSink.count,
-                           ntot * numtuples,
-                           sentval));
+    log.info(String.format(
+        "\n*******************************************************\nGot average per sec(%d), count(got %d, expected " + "%d), numtuples(%d)",
+        countSink.average,
+        countSink.count,
+        ntot * numtuples,
+        sentval));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/transform/TransformOperatorAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/transform/TransformOperatorAppTest.java b/library/src/test/java/com/datatorrent/lib/transform/TransformOperatorAppTest.java
index fc8040b..643fb04 100644
--- a/library/src/test/java/com/datatorrent/lib/transform/TransformOperatorAppTest.java
+++ b/library/src/test/java/com/datatorrent/lib/transform/TransformOperatorAppTest.java
@@ -55,7 +55,8 @@ public class TransformOperatorAppTest
 
   public static class Application implements StreamingApplication
   {
-    @Override public void populateDAG(DAG dag, Configuration configuration)
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
     {
       DummyInputGenerator input = dag.addOperator("Input", new DummyInputGenerator());
       TransformOperator transform = dag.addOperator("Transform", new TransformOperator());
@@ -106,24 +107,29 @@ public class TransformOperatorAppTest
   {
     public final transient DefaultOutputPort<TestPojo> output = new DefaultOutputPort<>();
 
-    @Override public void emitTuples()
+    @Override
+    public void emitTuples()
     {
       output.emit(new TestPojo("FirstName", "LastName"));
     }
 
-    @Override public void beginWindow(long l)
+    @Override
+    public void beginWindow(long l)
     {
     }
 
-    @Override public void endWindow()
+    @Override
+    public void endWindow()
     {
     }
 
-    @Override public void setup(Context.OperatorContext context)
+    @Override
+    public void setup(Context.OperatorContext context)
     {
     }
 
-    @Override public void teardown()
+    @Override
+    public void teardown()
     {
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/util/DimensionTimeBucketSumOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/DimensionTimeBucketSumOperatorTest.java b/library/src/test/java/com/datatorrent/lib/util/DimensionTimeBucketSumOperatorTest.java
index 0b681ad..52d27b8 100644
--- a/library/src/test/java/com/datatorrent/lib/util/DimensionTimeBucketSumOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/DimensionTimeBucketSumOperatorTest.java
@@ -18,21 +18,22 @@
  */
 package com.datatorrent.lib.util;
 
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang.mutable.MutableDouble;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.mutable.MutableDouble;
+
 import com.google.common.collect.Maps;
 
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DimensionTimeBucketSumOperatorTest
 {
@@ -69,8 +70,8 @@ public class DimensionTimeBucketSumOperatorTest
     dimensionKey.add("url");
     try {
       oper.addCombination(dimensionKey);
-    }
-    catch (NoSuchFieldException e) {
+    } catch (NoSuchFieldException e) {
+      //ignored
     }
     oper.setTimeBucketFlags(AbstractDimensionTimeBucketOperator.TIMEBUCKET_MINUTE);
     oper.setup(null);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/util/JavaScriptFilterOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/JavaScriptFilterOperatorTest.java b/library/src/test/java/com/datatorrent/lib/util/JavaScriptFilterOperatorTest.java
index 6afa03d..6164104 100644
--- a/library/src/test/java/com/datatorrent/lib/util/JavaScriptFilterOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/JavaScriptFilterOperatorTest.java
@@ -18,12 +18,14 @@
  */
 package com.datatorrent.lib.util;
 
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
 /**
  *
  * functional test for {@link com.datatorrent.lib.util.JavaScriptFilterOperator}.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
index 5eaea2e..5f284a9 100644
--- a/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
@@ -68,7 +68,8 @@ public class KryoCloneUtilsTest
     assertFalse(from.transientProp.equals(to.transientProp));
   }
 
-  private TestEntity getTestEntity(int depth) {
+  private TestEntity getTestEntity(int depth)
+  {
     TestEntity returnVal = null;
     TestEntity curr = null;
     while (depth-- > 0) {
@@ -82,8 +83,8 @@ public class KryoCloneUtilsTest
     return returnVal;
   }
 
-
-  static class TestEntity {
+  static class TestEntity
+  {
 
     String strProp = RandomStringUtils.random(10);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
index 138c731..e8553be 100644
--- a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
@@ -18,6 +18,26 @@
  */
 package com.datatorrent.lib.util;
 
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.expression.Expression;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterByte;
+import com.datatorrent.lib.util.PojoUtils.GetterChar;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.SetterByte;
+import com.datatorrent.lib.util.PojoUtils.SetterInt;
+import com.datatorrent.lib.util.PojoUtils.SetterLong;
+import com.datatorrent.lib.util.PojoUtils.SetterShort;
+
 import static com.datatorrent.lib.util.PojoUtils.constructGetter;
 import static com.datatorrent.lib.util.PojoUtils.constructSetter;
 import static com.datatorrent.lib.util.PojoUtils.createExpression;
@@ -45,26 +65,6 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.expression.Expression;
-import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.GetterByte;
-import com.datatorrent.lib.util.PojoUtils.GetterChar;
-import com.datatorrent.lib.util.PojoUtils.GetterDouble;
-import com.datatorrent.lib.util.PojoUtils.GetterFloat;
-import com.datatorrent.lib.util.PojoUtils.GetterInt;
-import com.datatorrent.lib.util.PojoUtils.GetterLong;
-import com.datatorrent.lib.util.PojoUtils.Getter;
-import com.datatorrent.lib.util.PojoUtils.GetterShort;
-import com.datatorrent.lib.util.PojoUtils.Setter;
-import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.SetterByte;
-import com.datatorrent.lib.util.PojoUtils.SetterInt;
-import com.datatorrent.lib.util.PojoUtils.SetterLong;
-import com.datatorrent.lib.util.PojoUtils.SetterShort;
-
 
 
 public class PojoUtilsTest
@@ -288,7 +288,8 @@ public class PojoUtilsTest
     }
 
     @SuppressWarnings("unused")
-    public void setIntVal(Integer intVal) {
+    public void setIntVal(Integer intVal)
+    {
       intField = intVal;
     }
 
@@ -409,31 +410,33 @@ public class PojoUtilsTest
     assertEquals(8, testPojo.getIntVal());
 
     SetterByte<Object> setterByte = createSetterByte(testPojoClass, TestPojo.INT_FIELD_NAME);
-    setterByte.set(testPojo, (byte) 9);
+    setterByte.set(testPojo, (byte)9);
     assertEquals(9, testPojo.intField);
 
     setterByte = (SetterByte<Object>)constructSetter(testPojoClass, TestPojo.INT_FIELD_NAME, byte.class);
-    setterByte.set(testPojo, (byte) 10);
+    setterByte.set(testPojo, (byte)10);
     assertEquals(10, testPojo.intField);
 
     setterByte = createSetterByte(testPojoClass, TestPojo.INT_METHOD_NAME);
-    setterByte.set(testPojo, (byte) 11);
+    setterByte.set(testPojo, (byte)11);
     assertEquals(11, testPojo.getIntVal());
 
     setterByte = ((SetterByte<Object>)constructSetter(testPojoClass, TestPojo.INT_METHOD_NAME, byte.class));
-    setterByte.set(testPojo, (byte) 12);
+    setterByte.set(testPojo, (byte)12);
     assertEquals(12, testPojo.getIntVal());
 
-    createSetter(testPojoClass, TestPojo.INT_FIELD_NAME, Byte.class).set(testPojo, Byte.valueOf((byte) 13));
+    createSetter(testPojoClass, TestPojo.INT_FIELD_NAME, Byte.class).set(testPojo, Byte.valueOf((byte)13));
     assertEquals(13, testPojo.intField);
 
-    ((Setter<Object, Byte>)constructSetter(testPojoClass, TestPojo.INT_FIELD_NAME, Byte.class)).set(testPojo, Byte.valueOf((byte) 14));
+    ((Setter<Object, Byte>)constructSetter(testPojoClass, TestPojo.INT_FIELD_NAME, Byte.class)).set(testPojo,
+        Byte.valueOf((byte)14));
     assertEquals(14, testPojo.getIntVal());
 
-    createSetter(testPojoClass, TestPojo.INT_METHOD_NAME, Byte.class).set(testPojo, Byte.valueOf((byte) 15));
+    createSetter(testPojoClass, TestPojo.INT_METHOD_NAME, Byte.class).set(testPojo, Byte.valueOf((byte)15));
     assertEquals(15, testPojo.getIntVal());
 
-    ((Setter<Object, Byte>)constructSetter(testPojoClass, TestPojo.INT_METHOD_NAME, Byte.class)).set(testPojo, Byte.valueOf((byte) 16));
+    ((Setter<Object, Byte>)constructSetter(testPojoClass, TestPojo.INT_METHOD_NAME, Byte.class)).set(testPojo,
+        Byte.valueOf((byte)16));
     assertEquals(16, testPojo.getIntVal());
 
     SetterShort<Object> setterShort = createSetterShort(testPojoClass, TestPojo.INT_FIELD_NAME);
@@ -448,8 +451,8 @@ public class PojoUtilsTest
       @SuppressWarnings("unused")
       SetterLong<Object> setterLong = createSetterLong(testPojoClass, TestPojo.INT_FIELD_NAME);
       fail("long can't be assigned to the int field");
-    }
-    catch (Exception ignored) {
+    } catch (Exception ignored) {
+      //ignored
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 55351fc..f3b2140 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -58,11 +58,11 @@ public class TestUtils
     FileUtils.deleteQuietly(new File("target/" + description.getClassName()));
   }
   
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @SuppressWarnings({"unchecked", "rawtypes"})
   public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink)
   {
-     port.setSink(sink);
-     return sink;
+    port.setSink(sink);
+    return sink;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java
index c9524b1..c448b72 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java
@@ -21,8 +21,6 @@ package org.apache.apex.malhar.lib.dimensions;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry;
-
 import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
 import com.datatorrent.lib.appdata.schemas.TimeBucket;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java
index 3101577..1514cd5 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java
@@ -24,8 +24,6 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
-
 import com.google.common.collect.Sets;
 
 import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
@@ -60,9 +58,8 @@ public class DimensionsDescriptorTest
   @Test
   public void simpleTest2()
   {
-    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
-                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
-                                                       KEY_2_NAME);
+    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + DimensionsDescriptor.DELIMETER_SEPERATOR +
+        KEY_2_NAME);
 
     Set<String> fields = Sets.newHashSet();
     fields.add(KEY_1_NAME);
@@ -75,11 +72,8 @@ public class DimensionsDescriptorTest
   @Test
   public void simpleTimeTest()
   {
-    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
-                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
-                                                       DimensionsDescriptor.DIMENSION_TIME +
-                                                       DimensionsDescriptor.DELIMETER_EQUALS +
-                                                       "DAYS");
+    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + DimensionsDescriptor.DELIMETER_SEPERATOR +
+        DimensionsDescriptor.DIMENSION_TIME + DimensionsDescriptor.DELIMETER_EQUALS + "DAYS");
 
     Set<String> fields = Sets.newHashSet();
     fields.add(KEY_1_NAME);
@@ -92,10 +86,10 @@ public class DimensionsDescriptorTest
   public void equalsAndHashCodeTest()
   {
     DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
-                                                        new Fields(Sets.newHashSet("a", "b")));
+        new Fields(Sets.newHashSet("a", "b")));
 
     DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
-                                                        new Fields(Sets.newHashSet("a", "b")));
+        new Fields(Sets.newHashSet("a", "b")));
 
     Assert.assertTrue(ddB.equals(ddA));
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index 4ba8905..d3564ba 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -18,7 +18,6 @@
  */
 package org.apache.apex.malhar.lib.state.managed;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.TreeMap;
@@ -26,11 +25,9 @@ import java.util.TreeMap;
 import javax.annotation.Nullable;
 
 import org.junit.Assert;
-import org.junit.runner.Description;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;