You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:41:54 UTC

[04/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/LogicalCompareTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/LogicalCompareTest.java b/library/src/test/java/com/datatorrent/lib/math/LogicalCompareTest.java
index 6ecb398..9fd2526 100644
--- a/library/src/test/java/com/datatorrent/lib/math/LogicalCompareTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/LogicalCompareTest.java
@@ -29,76 +29,76 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class LogicalCompareTest
 {
-	/**
-	 * Test operator logic emits correct results.
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+  /**
+   * Test operator logic emits correct results.
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
-	public void testNodeProcessing()
-	{
-		LogicalCompare<Integer> oper = new LogicalCompare<Integer>()
-		{
-		};
-		CollectorTestSink eSink = new CollectorTestSink();
-		CollectorTestSink neSink = new CollectorTestSink();
-		CollectorTestSink gtSink = new CollectorTestSink();
-		CollectorTestSink gteSink = new CollectorTestSink();
-		CollectorTestSink ltSink = new CollectorTestSink();
-		CollectorTestSink lteSink = new CollectorTestSink();
+  public void testNodeProcessing()
+  {
+    LogicalCompare<Integer> oper = new LogicalCompare<Integer>()
+    {
+    };
+    CollectorTestSink eSink = new CollectorTestSink();
+    CollectorTestSink neSink = new CollectorTestSink();
+    CollectorTestSink gtSink = new CollectorTestSink();
+    CollectorTestSink gteSink = new CollectorTestSink();
+    CollectorTestSink ltSink = new CollectorTestSink();
+    CollectorTestSink lteSink = new CollectorTestSink();
 
-		oper.equalTo.setSink(eSink);
-		oper.notEqualTo.setSink(neSink);
-		oper.greaterThan.setSink(gtSink);
-		oper.greaterThanOrEqualTo.setSink(gteSink);
-		oper.lessThan.setSink(ltSink);
-		oper.lessThanOrEqualTo.setSink(lteSink);
+    oper.equalTo.setSink(eSink);
+    oper.notEqualTo.setSink(neSink);
+    oper.greaterThan.setSink(gtSink);
+    oper.greaterThanOrEqualTo.setSink(gteSink);
+    oper.lessThan.setSink(ltSink);
+    oper.lessThanOrEqualTo.setSink(lteSink);
 
-		Pair<Integer, Integer> gtuple = new Pair<Integer, Integer>(2, 1);
-		Pair<Integer, Integer> etuple = new Pair<Integer, Integer>(2, 2);
-		Pair<Integer, Integer> ltuple = new Pair<Integer, Integer>(2, 3);
+    Pair<Integer, Integer> gtuple = new Pair<Integer, Integer>(2, 1);
+    Pair<Integer, Integer> etuple = new Pair<Integer, Integer>(2, 2);
+    Pair<Integer, Integer> ltuple = new Pair<Integer, Integer>(2, 3);
 
-		oper.beginWindow(0); //
+    oper.beginWindow(0); //
 
-		oper.input.process(gtuple);
-		oper.input.process(etuple);
-		oper.input.process(ltuple);
+    oper.input.process(gtuple);
+    oper.input.process(etuple);
+    oper.input.process(ltuple);
 
-		oper.endWindow(); //
+    oper.endWindow(); //
 
-		Assert.assertEquals("number emitted tuples", 1,
-				eSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				eSink.collectedTuples.get(0).equals(etuple), true);
+    Assert.assertEquals("number emitted tuples", 1,
+        eSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        eSink.collectedTuples.get(0).equals(etuple), true);
 
-		Assert.assertEquals("number emitted tuples", 2,
-				neSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				neSink.collectedTuples.get(0).equals(gtuple), true);
-		Assert.assertEquals("tuples were",
-				neSink.collectedTuples.get(1).equals(ltuple), true);
+    Assert.assertEquals("number emitted tuples", 2,
+        neSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        neSink.collectedTuples.get(0).equals(gtuple), true);
+    Assert.assertEquals("tuples were",
+        neSink.collectedTuples.get(1).equals(ltuple), true);
 
-		Assert.assertEquals("number emitted tuples", 1,
-				gtSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				gtSink.collectedTuples.get(0).equals(gtuple), true);
+    Assert.assertEquals("number emitted tuples", 1,
+        gtSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        gtSink.collectedTuples.get(0).equals(gtuple), true);
 
-		Assert.assertEquals("number emitted tuples", 2,
-				gteSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				gteSink.collectedTuples.get(0).equals(gtuple), true);
-		Assert.assertEquals("tuples were",
-				gteSink.collectedTuples.get(1).equals(etuple), true);
+    Assert.assertEquals("number emitted tuples", 2,
+        gteSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        gteSink.collectedTuples.get(0).equals(gtuple), true);
+    Assert.assertEquals("tuples were",
+        gteSink.collectedTuples.get(1).equals(etuple), true);
 
-		Assert.assertEquals("number emitted tuples", 1,
-				ltSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				ltSink.collectedTuples.get(0).equals(ltuple), true);
+    Assert.assertEquals("number emitted tuples", 1,
+        ltSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        ltSink.collectedTuples.get(0).equals(ltuple), true);
 
-		Assert.assertEquals("number emitted tuples", 2,
-				lteSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				lteSink.collectedTuples.get(0).equals(etuple), true);
-		Assert.assertEquals("tuples were",
-				lteSink.collectedTuples.get(1).equals(ltuple), true);
-	}
+    Assert.assertEquals("number emitted tuples", 2,
+        lteSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        lteSink.collectedTuples.get(0).equals(etuple), true);
+    Assert.assertEquals("tuples were",
+        lteSink.collectedTuples.get(1).equals(ltuple), true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/LogicalCompareToConstantTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/LogicalCompareToConstantTest.java b/library/src/test/java/com/datatorrent/lib/math/LogicalCompareToConstantTest.java
index c813629..df7ef2d 100644
--- a/library/src/test/java/com/datatorrent/lib/math/LogicalCompareToConstantTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/LogicalCompareToConstantTest.java
@@ -32,72 +32,72 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class LogicalCompareToConstantTest
 {
-	/**
-	 * Test operator logic emits correct results.
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+  /**
+   * Test operator logic emits correct results.
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
-	public void testNodeProcessing()
-	{
-		LogicalCompareToConstant<Integer> oper = new LogicalCompareToConstant<Integer>()
-		{
-		};
-		CollectorTestSink eSink = new CollectorTestSink();
-		CollectorTestSink neSink = new CollectorTestSink();
-		CollectorTestSink gtSink = new CollectorTestSink();
-		CollectorTestSink gteSink = new CollectorTestSink();
-		CollectorTestSink ltSink = new CollectorTestSink();
-		CollectorTestSink lteSink = new CollectorTestSink();
+  public void testNodeProcessing()
+  {
+    LogicalCompareToConstant<Integer> oper = new LogicalCompareToConstant<Integer>()
+    {
+    };
+    CollectorTestSink eSink = new CollectorTestSink();
+    CollectorTestSink neSink = new CollectorTestSink();
+    CollectorTestSink gtSink = new CollectorTestSink();
+    CollectorTestSink gteSink = new CollectorTestSink();
+    CollectorTestSink ltSink = new CollectorTestSink();
+    CollectorTestSink lteSink = new CollectorTestSink();
 
-		oper.equalTo.setSink(eSink);
-		oper.notEqualTo.setSink(neSink);
-		oper.greaterThan.setSink(gtSink);
-		oper.greaterThanOrEqualTo.setSink(gteSink);
-		oper.lessThan.setSink(ltSink);
-		oper.lessThanOrEqualTo.setSink(lteSink);
-		oper.setConstant(2);
+    oper.equalTo.setSink(eSink);
+    oper.notEqualTo.setSink(neSink);
+    oper.greaterThan.setSink(gtSink);
+    oper.greaterThanOrEqualTo.setSink(gteSink);
+    oper.lessThan.setSink(ltSink);
+    oper.lessThanOrEqualTo.setSink(lteSink);
+    oper.setConstant(2);
 
-		oper.beginWindow(0); //
-		oper.input.process(1);
-		oper.input.process(2);
-		oper.input.process(3);
+    oper.beginWindow(0); //
+    oper.input.process(1);
+    oper.input.process(2);
+    oper.input.process(3);
 
-		oper.endWindow(); //
+    oper.endWindow(); //
 
-		Assert.assertEquals("number emitted tuples", 1,
-				eSink.collectedTuples.size());
-		Assert.assertEquals("tuples were", eSink.collectedTuples.get(0).equals(2),
-				true);
+    Assert.assertEquals("number emitted tuples", 1,
+        eSink.collectedTuples.size());
+    Assert.assertEquals("tuples were", eSink.collectedTuples.get(0).equals(2),
+        true);
 
-		Assert.assertEquals("number emitted tuples", 2,
-				neSink.collectedTuples.size());
-		Assert.assertEquals("tuples were", neSink.collectedTuples.get(0).equals(1),
-				true);
-		Assert.assertEquals("tuples were", neSink.collectedTuples.get(1).equals(3),
-				true);
+    Assert.assertEquals("number emitted tuples", 2,
+        neSink.collectedTuples.size());
+    Assert.assertEquals("tuples were", neSink.collectedTuples.get(0).equals(1),
+        true);
+    Assert.assertEquals("tuples were", neSink.collectedTuples.get(1).equals(3),
+        true);
 
-		Assert.assertEquals("number emitted tuples", 1,
-				gtSink.collectedTuples.size());
-		Assert.assertEquals("tuples were", gtSink.collectedTuples.get(0).equals(1),
-				true);
+    Assert.assertEquals("number emitted tuples", 1,
+        gtSink.collectedTuples.size());
+    Assert.assertEquals("tuples were", gtSink.collectedTuples.get(0).equals(1),
+        true);
 
-		Assert.assertEquals("number emitted tuples", 2,
-				gteSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				gteSink.collectedTuples.get(0).equals(1), true);
-		Assert.assertEquals("tuples were",
-				gteSink.collectedTuples.get(1).equals(2), true);
+    Assert.assertEquals("number emitted tuples", 2,
+        gteSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        gteSink.collectedTuples.get(0).equals(1), true);
+    Assert.assertEquals("tuples were",
+        gteSink.collectedTuples.get(1).equals(2), true);
 
-		Assert.assertEquals("number emitted tuples", 1,
-				ltSink.collectedTuples.size());
-		Assert.assertEquals("tuples were", ltSink.collectedTuples.get(0).equals(3),
-				true);
+    Assert.assertEquals("number emitted tuples", 1,
+        ltSink.collectedTuples.size());
+    Assert.assertEquals("tuples were", ltSink.collectedTuples.get(0).equals(3),
+        true);
 
-		Assert.assertEquals("number emitted tuples", 2,
-				lteSink.collectedTuples.size());
-		Assert.assertEquals("tuples were",
-				lteSink.collectedTuples.get(0).equals(2), true);
-		Assert.assertEquals("tuples were",
-				lteSink.collectedTuples.get(1).equals(3), true);
-	}
+    Assert.assertEquals("number emitted tuples", 2,
+        lteSink.collectedTuples.size());
+    Assert.assertEquals("tuples were",
+        lteSink.collectedTuples.get(0).equals(2), true);
+    Assert.assertEquals("tuples were",
+        lteSink.collectedTuples.get(1).equals(3), true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MarginKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MarginKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/MarginKeyValTest.java
index 884e981..365df7a 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MarginKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MarginKeyValTest.java
@@ -29,57 +29,53 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 public class MarginKeyValTest
 {
-	/**
-	 * Test node logic emits correct results.
-	 */
-	@Test
-	public void testNodeProcessing() throws Exception
-	{
-		testNodeProcessingSchema(new MarginKeyVal<String, Integer>());
-		testNodeProcessingSchema(new MarginKeyVal<String, Double>());
-		testNodeProcessingSchema(new MarginKeyVal<String, Float>());
-		testNodeProcessingSchema(new MarginKeyVal<String, Short>());
-		testNodeProcessingSchema(new MarginKeyVal<String, Long>());
-	}
+  /**
+   * Test node logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new MarginKeyVal<String, Integer>());
+    testNodeProcessingSchema(new MarginKeyVal<String, Double>());
+    testNodeProcessingSchema(new MarginKeyVal<String, Float>());
+    testNodeProcessingSchema(new MarginKeyVal<String, Short>());
+    testNodeProcessingSchema(new MarginKeyVal<String, Long>());
+  }
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void testNodeProcessingSchema(MarginKeyVal oper)
-	{
-		CollectorTestSink marginSink = new CollectorTestSink();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testNodeProcessingSchema(MarginKeyVal oper)
+  {
+    CollectorTestSink marginSink = new CollectorTestSink();
 
-		oper.margin.setSink(marginSink);
+    oper.margin.setSink(marginSink);
 
-		oper.beginWindow(0);
-		oper.numerator.process(new KeyValPair("a", 2));
-		oper.numerator.process(new KeyValPair("b", 20));
-		oper.numerator.process(new KeyValPair("c", 1000));
+    oper.beginWindow(0);
+    oper.numerator.process(new KeyValPair("a", 2));
+    oper.numerator.process(new KeyValPair("b", 20));
+    oper.numerator.process(new KeyValPair("c", 1000));
 
-		oper.denominator.process(new KeyValPair("a", 2));
-		oper.denominator.process(new KeyValPair("b", 40));
-		oper.denominator.process(new KeyValPair("c", 500));
-		oper.endWindow();
+    oper.denominator.process(new KeyValPair("a", 2));
+    oper.denominator.process(new KeyValPair("b", 40));
+    oper.denominator.process(new KeyValPair("c", 500));
+    oper.endWindow();
 
-		Assert.assertEquals("number emitted tuples", 3,
-				marginSink.collectedTuples.size());
-		for (int i = 0; i < marginSink.collectedTuples.size(); i++) {
-			if ("a".equals(((KeyValPair<String, Number>) marginSink.collectedTuples
-					.get(i)).getKey())) {
-				Assert.assertEquals("emitted value for 'a' was ", new Double(0),
-						((KeyValPair<String, Number>) marginSink.collectedTuples.get(i))
-								.getValue().doubleValue(), 0);
-			}
-			if ("b".equals(((KeyValPair<String, Number>) marginSink.collectedTuples
-					.get(i)).getKey())) {
-				Assert.assertEquals("emitted value for 'b' was ", new Double(0.5),
-						((KeyValPair<String, Number>) marginSink.collectedTuples.get(i))
-								.getValue().doubleValue(), 0);
-			}
-			if ("c".equals(((KeyValPair<String, Number>) marginSink.collectedTuples
-					.get(i)).getKey())) {
-				Assert.assertEquals("emitted value for 'c' was ", new Double(-1),
-						((KeyValPair<String, Number>) marginSink.collectedTuples.get(i))
-								.getValue().doubleValue(), 0);
-			}
-		}
-	}
+    Assert.assertEquals("number emitted tuples", 3,
+        marginSink.collectedTuples.size());
+    for (int i = 0; i < marginSink.collectedTuples.size(); i++) {
+      if ("a".equals(((KeyValPair<String, Number>)marginSink.collectedTuples.get(i)).getKey())) {
+        Assert.assertEquals("emitted value for 'a' was ", 0d,
+            ((KeyValPair<String, Number>)marginSink.collectedTuples.get(i)).getValue().doubleValue(), 0);
+      }
+      if ("b".equals(((KeyValPair<String, Number>)marginSink.collectedTuples
+          .get(i)).getKey())) {
+        Assert.assertEquals("emitted value for 'b' was ", 0.5,
+            ((KeyValPair<String, Number>)marginSink.collectedTuples.get(i)).getValue().doubleValue(), 0);
+      }
+      if ("c".equals(((KeyValPair<String, Number>)marginSink.collectedTuples
+          .get(i)).getKey())) {
+        Assert.assertEquals("emitted value for 'c' was ", (double)-1,
+            ((KeyValPair<String, Number>)marginSink.collectedTuples.get(i)).getValue().doubleValue(), 0);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MarginMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MarginMapTest.java b/library/src/test/java/com/datatorrent/lib/math/MarginMapTest.java
index e16f3dc..07a378c 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MarginMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MarginMapTest.java
@@ -36,62 +36,58 @@ import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
  */
 public class MarginMapTest
 {
-	private static Logger LOG = LoggerFactory.getLogger(MarginMapTest.class);
+  private static Logger LOG = LoggerFactory.getLogger(MarginMapTest.class);
 
-	/**
-	 * Test node logic emits correct results
-	 */
-	@Test
-	public void testNodeProcessing() throws Exception
-	{
-		testNodeProcessingSchema(new MarginMap<String, Integer>());
-		testNodeProcessingSchema(new MarginMap<String, Double>());
-		testNodeProcessingSchema(new MarginMap<String, Float>());
-		testNodeProcessingSchema(new MarginMap<String, Short>());
-		testNodeProcessingSchema(new MarginMap<String, Long>());
-	}
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new MarginMap<String, Integer>());
+    testNodeProcessingSchema(new MarginMap<String, Double>());
+    testNodeProcessingSchema(new MarginMap<String, Float>());
+    testNodeProcessingSchema(new MarginMap<String, Short>());
+    testNodeProcessingSchema(new MarginMap<String, Long>());
+  }
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void testNodeProcessingSchema(MarginMap oper)
-	{
-		CountAndLastTupleTestSink marginSink = new CountAndLastTupleTestSink();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testNodeProcessingSchema(MarginMap oper)
+  {
+    CountAndLastTupleTestSink marginSink = new CountAndLastTupleTestSink();
 
-		oper.margin.setSink(marginSink);
+    oper.margin.setSink(marginSink);
 
-		oper.beginWindow(0);
-		HashMap<String, Number> input = new HashMap<String, Number>();
-		input.put("a", 2);
-		input.put("b", 20);
-		input.put("c", 1000);
-		oper.numerator.process(input);
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.numerator.process(input);
 
-		input.clear();
-		input.put("a", 2);
-		input.put("b", 40);
-		input.put("c", 500);
-		oper.denominator.process(input);
+    input.clear();
+    input.put("a", 2);
+    input.put("b", 40);
+    input.put("c", 500);
+    oper.denominator.process(input);
 
-		oper.endWindow();
+    oper.endWindow();
 
-		// One for each key
-		Assert.assertEquals("number emitted tuples", 1, marginSink.count);
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 1, marginSink.count);
 
-		HashMap<String, Number> output = (HashMap<String, Number>) marginSink.tuple;
-		for (Map.Entry<String, Number> e : output.entrySet()) {
-			LOG.debug(String.format("Key, value is %s,%f", e.getKey(), e.getValue()
-					.doubleValue()));
-			if (e.getKey().equals("a")) {
-				Assert.assertEquals("emitted value for 'a' was ", new Double(0), e
-						.getValue().doubleValue(), 0);
-			} else if (e.getKey().equals("b")) {
-				Assert.assertEquals("emitted tuple for 'b' was ", new Double(0.5), e
-						.getValue().doubleValue(), 0);
-			} else if (e.getKey().equals("c")) {
-				Assert.assertEquals("emitted tuple for 'c' was ", new Double(-1.0), e
-						.getValue().doubleValue(), 0);
-			} else {
-				LOG.debug(String.format("key was %s", e.getKey()));
-			}
-		}
-	}
+    HashMap<String, Number> output = (HashMap<String, Number>)marginSink.tuple;
+    for (Map.Entry<String, Number> e : output.entrySet()) {
+      LOG.debug(String.format("Key, value is %s,%f", e.getKey(), e.getValue().doubleValue()));
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", 0d, e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", 0.5, e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", -1.0, e.getValue().doubleValue(), 0);
+      } else {
+        LOG.debug(String.format("key was %s", e.getKey()));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MarginTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MarginTest.java b/library/src/test/java/com/datatorrent/lib/math/MarginTest.java
index 899a7ee..4ae6fe8 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MarginTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MarginTest.java
@@ -45,7 +45,7 @@ public class MarginTest
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
-	public void testNodeProcessingSchema(Margin oper)
+  public void testNodeProcessingSchema(Margin oper)
   {
     CountAndLastTupleTestSink marginSink = new CountAndLastTupleTestSink();
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MaxKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MaxKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/MaxKeyValTest.java
index a532da3..55cf5b4 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MaxKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MaxKeyValTest.java
@@ -23,11 +23,10 @@ import java.util.ArrayList;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
 import com.datatorrent.lib.util.KeyValPair;
 
@@ -66,28 +65,24 @@ public class MaxKeyValTest
     int numtuples = 10000;
     if (type.equals("integer")) {
       for (int i = 0; i < numtuples; i++) {
-        oper.data.process(new KeyValPair("a", new Integer(i)));
+        oper.data.process(new KeyValPair("a", i));
       }
-    }
-    else if (type.equals("double")) {
+    } else if (type.equals("double")) {
       for (int i = 0; i < numtuples; i++) {
-        oper.data.process(new KeyValPair("a", new Double(i)));
+        oper.data.process(new KeyValPair("a", (double)i));
       }
-    }
-    else if (type.equals("long")) {
+    } else if (type.equals("long")) {
       for (int i = 0; i < numtuples; i++) {
-        oper.data.process(new KeyValPair("a", new Long(i)));
+        oper.data.process(new KeyValPair("a", (long)i));
       }
-    }
-    else if (type.equals("short")) {
+    } else if (type.equals("short")) {
       int count = numtuples / 1000; // cannot cross 64K
       for (short j = 0; j < count; j++) {
-        oper.data.process(new KeyValPair("a", new Short(j)));
+        oper.data.process(new KeyValPair("a", j));
       }
-    }
-    else if (type.equals("float")) {
+    } else if (type.equals("float")) {
       for (int i = 0; i < numtuples; i++) {
-        oper.data.process(new KeyValPair("a", new Float(i)));
+        oper.data.process(new KeyValPair("a", (float)i));
       }
     }
 
@@ -97,8 +92,7 @@ public class MaxKeyValTest
     Number val = ((KeyValPair<String, Number>)maxSink.tuple).getValue().intValue();
     if (type.equals("short")) {
       Assert.assertEquals("emitted max value was ", (new Double(numtuples / 1000 - 1)).intValue(), val);
-    }
-    else {
+    } else {
       Assert.assertEquals("emitted max value was ", (new Double(numtuples - 1)).intValue(), val);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MaxTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MaxTest.java b/library/src/test/java/com/datatorrent/lib/math/MaxTest.java
index 87037b4..a294c26 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MaxTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MaxTest.java
@@ -41,9 +41,9 @@ public class MaxTest
 
     oper.beginWindow(0); //
 
-    Double a = new Double(2.0);
-    Double b = new Double(20.0);
-    Double c = new Double(1000.0);
+    Double a = 2.0;
+    Double b = 20.0;
+    Double c = 1000.0;
 
     oper.data.process(a);
     oper.data.process(b);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MinKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MinKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/MinKeyValTest.java
index be6461d..6d9371c 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MinKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MinKeyValTest.java
@@ -23,11 +23,10 @@ import java.util.ArrayList;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
 import com.datatorrent.lib.util.KeyValPair;
 
@@ -36,105 +35,104 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 public class MinKeyValTest
 {
-	/**
-	 * Test functional logic
-	 */
-	@Test
-	public void testNodeProcessing()
-	{
-		testSchemaNodeProcessing(new MinKeyVal<String, Integer>(), "integer");
-		testSchemaNodeProcessing(new MinKeyVal<String, Double>(), "double");
-		testSchemaNodeProcessing(new MinKeyVal<String, Long>(), "long");
-		testSchemaNodeProcessing(new MinKeyVal<String, Short>(), "short");
-		testSchemaNodeProcessing(new MinKeyVal<String, Float>(), "float");
-	}
+  /**
+   * Test functional logic
+   */
+  @Test
+  public void testNodeProcessing()
+  {
+    testSchemaNodeProcessing(new MinKeyVal<String, Integer>(), "integer");
+    testSchemaNodeProcessing(new MinKeyVal<String, Double>(), "double");
+    testSchemaNodeProcessing(new MinKeyVal<String, Long>(), "long");
+    testSchemaNodeProcessing(new MinKeyVal<String, Short>(), "short");
+    testSchemaNodeProcessing(new MinKeyVal<String, Float>(), "float");
+  }
 
-	/**
-	 * Test operator logic emits correct results for each schema.
-	 *
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void testSchemaNodeProcessing(MinKeyVal oper, String type)
-	{
-		CountAndLastTupleTestSink minSink = new CountAndLastTupleTestSink();
-		oper.min.setSink(minSink);
+  /**
+   * Test operator logic emits correct results for each schema.
+   *
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testSchemaNodeProcessing(MinKeyVal oper, String type)
+  {
+    CountAndLastTupleTestSink minSink = new CountAndLastTupleTestSink();
+    oper.min.setSink(minSink);
 
-		oper.beginWindow(0);
+    oper.beginWindow(0);
 
-		int numtuples = 10000;
-		if (type.equals("integer")) {
-			for (int i = numtuples; i > 0; i--) {
-				oper.data.process(new KeyValPair("a", new Integer(i)));
-			}
-		} else if (type.equals("double")) {
-			for (int i = numtuples; i > 0; i--) {
-				oper.data.process(new KeyValPair("a", new Double(i)));
-			}
-		} else if (type.equals("long")) {
-			for (int i = numtuples; i > 0; i--) {
-				oper.data.process(new KeyValPair("a", new Long(i)));
-			}
-		} else if (type.equals("short")) {
-			for (short j = 1000; j > 0; j--) { // cannot cross 64K
-				oper.data.process(new KeyValPair("a", new Short(j)));
-			}
-		} else if (type.equals("float")) {
-			for (int i = numtuples; i > 0; i--) {
-				oper.data.process(new KeyValPair("a", new Float(i)));
-			}
-		}
+    int numtuples = 10000;
+    if (type.equals("integer")) {
+      for (int i = numtuples; i > 0; i--) {
+        oper.data.process(new KeyValPair("a", new Integer(i)));
+      }
+    } else if (type.equals("double")) {
+      for (int i = numtuples; i > 0; i--) {
+        oper.data.process(new KeyValPair("a", (double)i));
+      }
+    } else if (type.equals("long")) {
+      for (int i = numtuples; i > 0; i--) {
+        oper.data.process(new KeyValPair("a", (long)i));
+      }
+    } else if (type.equals("short")) {
+      for (short j = 1000; j > 0; j--) { // cannot cross 64K
+        oper.data.process(new KeyValPair("a", j));
+      }
+    } else if (type.equals("float")) {
+      for (int i = numtuples; i > 0; i--) {
+        oper.data.process(new KeyValPair("a", (float)i));
+      }
+    }
 
-		oper.endWindow();
+    oper.endWindow();
 
-		Assert.assertEquals("number emitted tuples", 1, minSink.count);
-		Number val = ((KeyValPair<String, Number>) minSink.tuple).getValue()
-				.intValue();
-		if (type.equals("short")) {
-			Assert.assertEquals("emitted min value was ", 1, val);
-		} else {
-			Assert.assertEquals("emitted min value was ", 1, val);
-		}
-	}
+    Assert.assertEquals("number emitted tuples", 1, minSink.count);
+    Number val = ((KeyValPair<String, Number>)minSink.tuple).getValue().intValue();
+    if (type.equals("short")) {
+      Assert.assertEquals("emitted min value was ", 1, val);
+    } else {
+      Assert.assertEquals("emitted min value was ", 1, val);
+    }
+  }
 
-	/**
-	 * Used to test partitioning.
-	 */
-	public static class TestInputOperator extends BaseOperator implements
-			InputOperator
-	{
-		public final transient DefaultOutputPort<KeyValPair<String, Integer>> output = new DefaultOutputPort<KeyValPair<String, Integer>>();
-		private transient boolean first = true;
+  /**
+   * Used to test partitioning.
+   */
+  public static class TestInputOperator extends BaseOperator implements
+      InputOperator
+  {
+    public final transient DefaultOutputPort<KeyValPair<String, Integer>> output = new DefaultOutputPort<KeyValPair<String, Integer>>();
+    private transient boolean first = true;
 
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		@Override
-		public void emitTuples()
-		{
-			if (first) {
-				for (int i = 40; i < 100; i++) {
-					output.emit(new KeyValPair("a", new Integer(i)));
-				}
-				for (int i = 50; i < 100; i++) {
-					output.emit(new KeyValPair("b", new Integer(i)));
-				}
-				for (int i = 60; i < 100; i++) {
-					output.emit(new KeyValPair("c", new Integer(i)));
-				}
-				first = false;
-			}
-		}
-	}
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void emitTuples()
+    {
+      if (first) {
+        for (int i = 40; i < 100; i++) {
+          output.emit(new KeyValPair("a", i));
+        }
+        for (int i = 50; i < 100; i++) {
+          output.emit(new KeyValPair("b", i));
+        }
+        for (int i = 60; i < 100; i++) {
+          output.emit(new KeyValPair("c", i));
+        }
+        first = false;
+      }
+    }
+  }
 
-	public static class CollectorOperator extends BaseOperator
-	{
-		public static final ArrayList<KeyValPair<String, Integer>> buffer = new ArrayList<KeyValPair<String, Integer>>();
-		public final transient DefaultInputPort<KeyValPair<String, Integer>> input = new DefaultInputPort<KeyValPair<String, Integer>>()
-		{
-			@SuppressWarnings({ "unchecked", "rawtypes" })
-			@Override
-			public void process(KeyValPair<String, Integer> tuple)
-			{
-				buffer.add(new KeyValPair(tuple.getKey(), tuple.getValue()));
-			}
-		};
-	}
+  public static class CollectorOperator extends BaseOperator
+  {
+    public static final ArrayList<KeyValPair<String, Integer>> buffer = new ArrayList<KeyValPair<String, Integer>>();
+    public final transient DefaultInputPort<KeyValPair<String, Integer>> input = new DefaultInputPort<KeyValPair<String, Integer>>()
+    {
+      @SuppressWarnings({ "unchecked", "rawtypes" })
+      @Override
+      public void process(KeyValPair<String, Integer> tuple)
+      {
+        buffer.add(new KeyValPair(tuple.getKey(), tuple.getValue()));
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MinTest.java b/library/src/test/java/com/datatorrent/lib/math/MinTest.java
index 18aa8cd..dfffae3 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MinTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MinTest.java
@@ -34,7 +34,7 @@ public class MinTest
    * Test oper logic emits correct results
    */
   @SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
+  @Test
   public void testNodeSchemaProcessing()
   {
     Min<Double> oper = new Min<Double>();
@@ -43,9 +43,9 @@ public class MinTest
 
     oper.beginWindow(0); //
 
-    Double a = new Double(2.0);
-    Double b = new Double(20.0);
-    Double c = new Double(1000.0);
+    Double a = 2.0;
+    Double b = 20.0;
+    Double c = 1000.0;
 
     oper.data.process(a);
     oper.data.process(b);
@@ -74,6 +74,6 @@ public class MinTest
     oper.endWindow(); //
 
     Assert.assertEquals("number emitted tuples", 1, minSink.count);
-    Assert.assertEquals("emitted high value was ", new Double(1.0), minSink.tuple);
+    Assert.assertEquals("emitted high value was ", 1.0, minSink.tuple);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
index 0f5283b..68e89eb 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
@@ -28,7 +28,7 @@ import com.datatorrent.lib.testbench.SumTestSink;
  */
 public class MultiplyByConstantTest
 {
-	/**
+  /**
    * Test oper logic emits correct results
    */
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
index 7ff2e66..92c0e77 100644
--- a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
@@ -33,60 +33,60 @@ import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
  */
 public class QuotientMapTest
 {
-	private static Logger LOG = LoggerFactory.getLogger(QuotientMap.class);
+  private static Logger LOG = LoggerFactory.getLogger(QuotientMap.class);
 
-	/**
-	 * Test node logic emits correct results
-	 */
-	@Test
-	public void testNodeProcessing() throws Exception
-	{
-		testNodeProcessingSchema(new QuotientMap<String, Integer>());
-		testNodeProcessingSchema(new QuotientMap<String, Double>());
-	}
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new QuotientMap<String, Integer>());
+    testNodeProcessingSchema(new QuotientMap<String, Double>());
+  }
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void testNodeProcessingSchema(QuotientMap oper) throws Exception
-	{
-		CountAndLastTupleTestSink quotientSink = new CountAndLastTupleTestSink();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testNodeProcessingSchema(QuotientMap oper) throws Exception
+  {
+    CountAndLastTupleTestSink quotientSink = new CountAndLastTupleTestSink();
 
-		oper.quotient.setSink(quotientSink);
-		oper.setMult_by(2);
+    oper.quotient.setSink(quotientSink);
+    oper.setMult_by(2);
 
-		oper.beginWindow(0); //
-		HashMap<String, Number> input = new HashMap<String, Number>();
-		int numtuples = 100;
-		for (int i = 0; i < numtuples; i++) {
-			input.clear();
-			input.put("a", 2);
-			input.put("b", 20);
-			input.put("c", 1000);
-			oper.numerator.process(input);
-			input.clear();
-			input.put("a", 2);
-			input.put("b", 40);
-			input.put("c", 500);
-			oper.denominator.process(input);
-		}
+    oper.beginWindow(0); //
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    int numtuples = 100;
+    for (int i = 0; i < numtuples; i++) {
+      input.clear();
+      input.put("a", 2);
+      input.put("b", 20);
+      input.put("c", 1000);
+      oper.numerator.process(input);
+      input.clear();
+      input.put("a", 2);
+      input.put("b", 40);
+      input.put("c", 500);
+      oper.denominator.process(input);
+    }
 
-		oper.endWindow();
+    oper.endWindow();
 
-		// One for each key
-		Assert.assertEquals("number emitted tuples", 1, quotientSink.count);
-		HashMap<String, Number> output = (HashMap<String, Number>) quotientSink.tuple;
-		for (Map.Entry<String, Number> e : output.entrySet()) {
-			if (e.getKey().equals("a")) {
-				Assert.assertEquals("emitted value for 'a' was ", new Double(2),
-						e.getValue());
-			} else if (e.getKey().equals("b")) {
-				Assert.assertEquals("emitted tuple for 'b' was ", new Double(1),
-						e.getValue());
-			} else if (e.getKey().equals("c")) {
-				Assert.assertEquals("emitted tuple for 'c' was ", new Double(4),
-						e.getValue());
-			} else {
-				LOG.debug(String.format("key was %s", e.getKey()));
-			}
-		}
-	}
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 1, quotientSink.count);
+    HashMap<String, Number> output = (HashMap<String, Number>)quotientSink.tuple;
+    for (Map.Entry<String, Number> e : output.entrySet()) {
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", 2d,
+            e.getValue());
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", 1d,
+            e.getValue());
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", 4d,
+            e.getValue());
+      } else {
+        LOG.debug(String.format("key was %s", e.getKey()));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
index 9b71427..604e45f 100644
--- a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
@@ -31,71 +31,72 @@ import com.datatorrent.api.Sink;
  */
 public class QuotientTest
 {
-	class TestSink implements Sink<Object>
-	{
-		List<Object> collectedTuples = new ArrayList<Object>();
 
-		@Override
-		public void put(Object payload)
-		{
-          collectedTuples.add(payload);
-		}
+  class TestSink implements Sink<Object>
+  {
+    List<Object> collectedTuples = new ArrayList<Object>();
 
-		@Override
-		public int getCount(boolean reset)
-		{
-			throw new UnsupportedOperationException("Not supported yet.");
-		}
-	}
+    @Override
+    public void put(Object payload)
+    {
+      collectedTuples.add(payload);
+    }
 
-	/**
-	 * Test oper logic emits correct results.
-	 */
-	@Test
-	public void testNodeSchemaProcessing()
-	{
-		Quotient<Double> oper = new Quotient<Double>();
-		TestSink quotientSink = new TestSink();
-		oper.quotient.setSink(quotientSink);
+    @Override
+    public int getCount(boolean reset)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
 
-		oper.setMult_by(2);
+  /**
+   * Test oper logic emits correct results.
+   */
+  @Test
+  public void testNodeSchemaProcessing()
+  {
+    Quotient<Double> oper = new Quotient<Double>();
+    TestSink quotientSink = new TestSink();
+    oper.quotient.setSink(quotientSink);
 
-		oper.beginWindow(0); //
-		Double a = new Double(30.0);
-		Double b = new Double(20.0);
-		Double c = new Double(100.0);
-		oper.denominator.process(a);
-		oper.denominator.process(b);
-		oper.denominator.process(c);
+    oper.setMult_by(2);
 
-		a = 5.0;
-		oper.numerator.process(a);
-		a = 1.0;
-		oper.numerator.process(a);
-		b = 44.0;
-		oper.numerator.process(b);
+    oper.beginWindow(0); //
+    Double a = 30.0;
+    Double b = 20.0;
+    Double c = 100.0;
+    oper.denominator.process(a);
+    oper.denominator.process(b);
+    oper.denominator.process(c);
 
-		b = 10.0;
-		oper.numerator.process(b);
-		c = 22.0;
-		oper.numerator.process(c);
-		c = 18.0;
-		oper.numerator.process(c);
+    a = 5.0;
+    oper.numerator.process(a);
+    a = 1.0;
+    oper.numerator.process(a);
+    b = 44.0;
+    oper.numerator.process(b);
 
-		a = 0.5;
-		oper.numerator.process(a);
-		b = 41.5;
-		oper.numerator.process(b);
-		a = 8.0;
-		oper.numerator.process(a);
-		oper.endWindow(); //
+    b = 10.0;
+    oper.numerator.process(b);
+    c = 22.0;
+    oper.numerator.process(c);
+    c = 18.0;
+    oper.numerator.process(c);
 
-		// payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-		Assert.assertEquals("number emitted tuples", 1,
-				quotientSink.collectedTuples.size());
-		for (Object o : quotientSink.collectedTuples) { // sum is 1157
-			Double val = (Double) o;
-			Assert.assertEquals("emitted quotient value was ", new Double(2.0), val);
-		}
-	}
+    a = 0.5;
+    oper.numerator.process(a);
+    b = 41.5;
+    oper.numerator.process(b);
+    a = 8.0;
+    oper.numerator.process(a);
+    oper.endWindow(); //
+
+    // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+    Assert.assertEquals("number emitted tuples", 1,
+        quotientSink.collectedTuples.size());
+    for (Object o : quotientSink.collectedTuples) { // sum is 1157
+      Double val = (Double)o;
+      Assert.assertEquals("emitted quotient value was ", new Double(2.0), val);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/RangeKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/RangeKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/RangeKeyValTest.java
index a6d8bc8..57727b4 100644
--- a/library/src/test/java/com/datatorrent/lib/math/RangeKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/RangeKeyValTest.java
@@ -60,33 +60,29 @@ public class RangeKeyValTest<V extends Number>
     int numtuples = 1000;
     if (type.equals("integer")) {
       for (int i = -10; i < numtuples; i++) {
-        node.data.process(new KeyValPair<String, Integer>("a", new Integer(i)));
+        node.data.process(new KeyValPair<String, Integer>("a", i));
       }
-    }
-    else if (type.equals("double")) {
+    } else if (type.equals("double")) {
       for (int i = -10; i < numtuples; i++) {
-        node.data.process(new KeyValPair<String, Double>("a", new Double(i)));
+        node.data.process(new KeyValPair<String, Double>("a", (double)i));
       }
-    }
-    else if (type.equals("long")) {
+    } else if (type.equals("long")) {
       for (int i = -10; i < numtuples; i++) {
-        node.data.process(new KeyValPair<String, Long>("a", new Long(i)));
+        node.data.process(new KeyValPair<String, Long>("a", (long)i));
       }
-    }
-    else if (type.equals("short")) {
+    } else if (type.equals("short")) {
       for (short i = -10; i < numtuples; i++) {
-        node.data.process(new KeyValPair<String, Short>("a", new Short(i)));
+        node.data.process(new KeyValPair<String, Short>("a", i));
       }
-    }
-    else if (type.equals("float")) {
+    } else if (type.equals("float")) {
       for (int i = -10; i < numtuples; i++) {
-        node.data.process(new KeyValPair<String, Float>("a", new Float(i)));
+        node.data.process(new KeyValPair<String, Float>("a", (float)i));
       }
     }
 
     node.endWindow();
-    Assert.assertEquals("high was ", new Double(999.0), rangeSink.high, 0);
-    Assert.assertEquals("low was ", new Double(-10.0), rangeSink.low, 0);
+    Assert.assertEquals("high was ", 999.0, rangeSink.high, 0);
+    Assert.assertEquals("low was ", -10.0, rangeSink.low, 0);
     log.debug(String.format("\nTested %d tuples", numtuples));
   }
 
@@ -100,8 +96,8 @@ public class RangeKeyValTest<V extends Number>
     @Override
     public void put(Object payload)
     {
-      KeyValPair<String, Object> tuple = (KeyValPair<String, Object>) payload;
-      HighLow<V> hl = (HighLow<V>) tuple.getValue();
+      KeyValPair<String, Object> tuple = (KeyValPair<String, Object>)payload;
+      HighLow<V> hl = (HighLow<V>)tuple.getValue();
       high = hl.getHigh().doubleValue();
       low = hl.getLow().doubleValue();
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/RangeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/RangeTest.java b/library/src/test/java/com/datatorrent/lib/math/RangeTest.java
index 931486f..c708c2a 100644
--- a/library/src/test/java/com/datatorrent/lib/math/RangeTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/RangeTest.java
@@ -33,80 +33,80 @@ import com.datatorrent.lib.util.HighLow;
  */
 public class RangeTest<V extends Number>
 {
-	@SuppressWarnings("rawtypes")
-	class TestSink implements Sink
-	{
-		List<Object> collectedTuples = new ArrayList<Object>();
-
-		@Override
-		public void put(Object payload)
-		{
-          collectedTuples.add(payload);
-		}
-
-		@Override
-		public int getCount(boolean reset)
-		{
-			throw new UnsupportedOperationException("Not supported yet.");
-		}
-	}
-
-	/**
-	 * Test oper logic emits correct results
-	 */
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testNodeSchemaProcessing()
-	{
-		Range<Double> oper = new Range<Double>();
-		TestSink rangeSink = new TestSink();
-		oper.range.setSink(rangeSink);
-
-		oper.beginWindow(0); //
-
-		int numTuples = 1000;
-		for (int i = 0; i < numTuples; i++) {
-			Double a = new Double(20.0);
-			Double b = new Double(2.0);
-			Double c = new Double(1000.0);
-
-			oper.data.process(a);
-			oper.data.process(b);
-			oper.data.process(c);
-
-			a = 1.0;
-			oper.data.process(a);
-			a = 10.0;
-			oper.data.process(a);
-			b = 5.0;
-			oper.data.process(b);
-
-			b = 12.0;
-			oper.data.process(b);
-			c = 22.0;
-			oper.data.process(c);
-			c = 14.0;
-			oper.data.process(c);
-
-			a = 46.0;
-			oper.data.process(a);
-			b = 2.0;
-			oper.data.process(b);
-			a = 23.0;
-			oper.data.process(a);
-		}
-
-		oper.endWindow(); //
-
-		// payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-		Assert.assertEquals("number emitted tuples", 1,
-				rangeSink.collectedTuples.size());
-		for (Object o : rangeSink.collectedTuples) {
-			HighLow<V> hl = (HighLow<V>) o;
-			Assert.assertEquals("emitted high value was ", new Double(1000.0),
-					hl.getHigh());
-			Assert.assertEquals("emitted low value was ", new Double(1.0),
-					hl.getLow());
-		}
-	}
+
+  @SuppressWarnings("rawtypes")
+  class TestSink implements Sink
+  {
+    List<Object> collectedTuples = new ArrayList<Object>();
+
+    @Override
+    public void put(Object payload)
+    {
+      collectedTuples.add(payload);
+    }
+
+    @Override
+    public int getCount(boolean reset)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+
+  /**
+   * Test oper logic emits correct results
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNodeSchemaProcessing()
+  {
+    Range<Double> oper = new Range<Double>();
+    TestSink rangeSink = new TestSink();
+    oper.range.setSink(rangeSink);
+
+    oper.beginWindow(0); //
+
+    int numTuples = 1000;
+    for (int i = 0; i < numTuples; i++) {
+      Double a = new Double(20.0);
+      Double b = new Double(2.0);
+      Double c = new Double(1000.0);
+
+      oper.data.process(a);
+      oper.data.process(b);
+      oper.data.process(c);
+
+      a = 1.0;
+      oper.data.process(a);
+      a = 10.0;
+      oper.data.process(a);
+      b = 5.0;
+      oper.data.process(b);
+
+      b = 12.0;
+      oper.data.process(b);
+      c = 22.0;
+      oper.data.process(c);
+      c = 14.0;
+      oper.data.process(c);
+
+      a = 46.0;
+      oper.data.process(a);
+      b = 2.0;
+      oper.data.process(b);
+      a = 23.0;
+      oper.data.process(a);
+    }
+
+    oper.endWindow(); //
+
+    // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+    Assert.assertEquals("number emitted tuples", 1, rangeSink.collectedTuples.size());
+    for (Object o : rangeSink.collectedTuples) {
+      HighLow<V> hl = (HighLow<V>)o;
+      Assert.assertEquals("emitted high value was ", 1000.0,
+          hl.getHigh());
+      Assert.assertEquals("emitted low value was ", 1.0,
+          hl.getLow());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java b/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java
index d84ecf3..52da631 100644
--- a/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java
@@ -18,50 +18,50 @@
  */
 package com.datatorrent.lib.math;
 
-import static org.junit.Assert.assertEquals;
-
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Functional tests for {@link com.datatorrent.lib.math.RunningAverage}
  */
 public class RunningAverageTest
 {
-	public RunningAverageTest()
-	{
-	}
+  public RunningAverageTest()
+  {
+  }
 
-	@Test
-	public void testLogicForSmallValues()
-	{
-		logger.debug("small values");
-		RunningAverage instance = new RunningAverage();
-		instance.input.process(1.0);
+  @Test
+  public void testLogicForSmallValues()
+  {
+    logger.debug("small values");
+    RunningAverage instance = new RunningAverage();
+    instance.input.process(1.0);
 
-		assertEquals("first average", 1.0, instance.average, 0.00001);
-		assertEquals("first count", 1, instance.count);
+    assertEquals("first average", 1.0, instance.average, 0.00001);
+    assertEquals("first count", 1, instance.count);
 
-		instance.input.process(2.0);
+    instance.input.process(2.0);
 
-		assertEquals("second average", 1.5, instance.average, 0.00001);
-		assertEquals("second count", 2, instance.count);
-	}
+    assertEquals("second average", 1.5, instance.average, 0.00001);
+    assertEquals("second count", 2, instance.count);
+  }
 
-	@Test
-	public void testLogicForLargeValues()
-	{
-		logger.debug("large values");
-		RunningAverage instance = new RunningAverage();
-		instance.input.process(Long.MAX_VALUE);
+  @Test
+  public void testLogicForLargeValues()
+  {
+    logger.debug("large values");
+    RunningAverage instance = new RunningAverage();
+    instance.input.process(Long.MAX_VALUE);
 
-		assertEquals("first average", Long.MAX_VALUE, (long) instance.average);
+    assertEquals("first average", Long.MAX_VALUE, (long)instance.average);
 
-		instance.input.process(Long.MAX_VALUE);
-		assertEquals("second average", Long.MAX_VALUE, (long) instance.average);
-	}
+    instance.input.process(Long.MAX_VALUE);
+    assertEquals("second average", Long.MAX_VALUE, (long)instance.average);
+  }
 
-	private static final Logger logger = LoggerFactory
-			.getLogger(RunningAverageTest.class);
+  private static final Logger logger = LoggerFactory
+      .getLogger(RunningAverageTest.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
index 671826a..e968dba 100644
--- a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
@@ -33,42 +33,42 @@ import com.datatorrent.lib.testbench.SumTestSink;
  */
 public class SigmaTest
 {
-	/**
-	 * Test oper logic emits correct results
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testNodeSchemaProcessing()
-	{
-		Sigma oper = new Sigma();
-		SumTestSink lmultSink = new SumTestSink();
-		SumTestSink imultSink = new SumTestSink();
-		SumTestSink dmultSink = new SumTestSink();
-		SumTestSink fmultSink = new SumTestSink();
-		oper.longResult.setSink(lmultSink);
-		oper.integerResult.setSink(imultSink);
-		oper.doubleResult.setSink(dmultSink);
-		oper.floatResult.setSink(fmultSink);
+  /**
+   * Test oper logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeSchemaProcessing()
+  {
+    Sigma oper = new Sigma();
+    SumTestSink lmultSink = new SumTestSink();
+    SumTestSink imultSink = new SumTestSink();
+    SumTestSink dmultSink = new SumTestSink();
+    SumTestSink fmultSink = new SumTestSink();
+    oper.longResult.setSink(lmultSink);
+    oper.integerResult.setSink(imultSink);
+    oper.doubleResult.setSink(dmultSink);
+    oper.floatResult.setSink(fmultSink);
 
-		int sum = 0;
-		ArrayList<Integer> list = new ArrayList<Integer>();
-		for (int i = 0; i < 100; i++) {
-			list.add(i);
-			sum += i;
-		}
+    int sum = 0;
+    ArrayList<Integer> list = new ArrayList<Integer>();
+    for (int i = 0; i < 100; i++) {
+      list.add(i);
+      sum += i;
+    }
 
-		oper.beginWindow(0); //
-		oper.input.process(list);
-		oper.endWindow(); //
+    oper.beginWindow(0); //
+    oper.input.process(list);
+    oper.endWindow(); //
 
-		oper.beginWindow(1); //
-		oper.input.process(list);
-		oper.endWindow(); //
-		sum = sum * 2;
+    oper.beginWindow(1); //
+    oper.input.process(list);
+    oper.endWindow(); //
+    sum = sum * 2;
 
-		Assert.assertEquals("sum was", sum, lmultSink.val.intValue());
-		Assert.assertEquals("sum was", sum, imultSink.val.intValue());
-		Assert.assertEquals("sum was", sum, dmultSink.val.intValue());
-		Assert.assertEquals("sum", sum, fmultSink.val.intValue());
-	}
+    Assert.assertEquals("sum was", sum, lmultSink.val.intValue());
+    Assert.assertEquals("sum was", sum, imultSink.val.intValue());
+    Assert.assertEquals("sum was", sum, dmultSink.val.intValue());
+    Assert.assertEquals("sum", sum, fmultSink.val.intValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/SquareCalculusTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SquareCalculusTest.java b/library/src/test/java/com/datatorrent/lib/math/SquareCalculusTest.java
index 27f7464..7f5ab53 100644
--- a/library/src/test/java/com/datatorrent/lib/math/SquareCalculusTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/SquareCalculusTest.java
@@ -28,35 +28,35 @@ import com.datatorrent.lib.testbench.SumTestSink;
  */
 public class SquareCalculusTest
 {
-	/**
-	 * Test oper logic emits correct results
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testNodeSchemaProcessing()
-	{
-		SquareCalculus oper = new SquareCalculus();
-		SumTestSink lmultSink = new SumTestSink();
-		SumTestSink imultSink = new SumTestSink();
-		SumTestSink dmultSink = new SumTestSink();
-		SumTestSink fmultSink = new SumTestSink();
-		oper.longResult.setSink(lmultSink);
-		oper.integerResult.setSink(imultSink);
-		oper.doubleResult.setSink(dmultSink);
-		oper.floatResult.setSink(fmultSink);
+  /**
+   * Test oper logic emits correct results
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test
+  public void testNodeSchemaProcessing()
+  {
+    SquareCalculus oper = new SquareCalculus();
+    SumTestSink lmultSink = new SumTestSink();
+    SumTestSink imultSink = new SumTestSink();
+    SumTestSink dmultSink = new SumTestSink();
+    SumTestSink fmultSink = new SumTestSink();
+    oper.longResult.setSink(lmultSink);
+    oper.integerResult.setSink(imultSink);
+    oper.doubleResult.setSink(dmultSink);
+    oper.floatResult.setSink(fmultSink);
 
-		oper.beginWindow(0); //
-		int sum = 0;
-		for (int i = 0; i < 50; i++) {
-			Integer t = i;
-			oper.input.process(t);
-			sum += i * i;
-		}
-		oper.endWindow(); //
+    oper.beginWindow(0); //
+    int sum = 0;
+    for (int i = 0; i < 50; i++) {
+      Integer t = i;
+      oper.input.process(t);
+      sum += i * i;
+    }
+    oper.endWindow(); //
 
-		Assert.assertEquals("sum was", sum, lmultSink.val.intValue());
-		Assert.assertEquals("sum was", sum, imultSink.val.intValue());
-		Assert.assertEquals("sum was", sum, dmultSink.val.intValue());
-		Assert.assertEquals("sum", sum, fmultSink.val.intValue());
-	}
+    Assert.assertEquals("sum was", sum, lmultSink.val.intValue());
+    Assert.assertEquals("sum was", sum, imultSink.val.intValue());
+    Assert.assertEquals("sum was", sum, dmultSink.val.intValue());
+    Assert.assertEquals("sum", sum, fmultSink.val.intValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java b/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
index 5cca950..b0c7b38 100644
--- a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
@@ -31,125 +31,124 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SumCountMapTest
 {
-	/**
-	 * Test operator logic emits correct results.
-	 */
-	@Test
-	public void testNodeProcessing()
-	{
-		testNodeSchemaProcessing(true, true);
-		testNodeSchemaProcessing(true, false);
-		testNodeSchemaProcessing(false, true);
-		testNodeSchemaProcessing(false, false);
-	}
+  /**
+   * Test operator logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing()
+  {
+    testNodeSchemaProcessing(true, true);
+    testNodeSchemaProcessing(true, false);
+    testNodeSchemaProcessing(false, true);
+    testNodeSchemaProcessing(false, false);
+  }
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void testNodeSchemaProcessing(boolean sum, boolean count)
-	{
-		SumCountMap<String, Double> oper = new SumCountMap<String, Double>();
-		oper.setType(Double.class);
-		CollectorTestSink sumSink = new CollectorTestSink();
-		CollectorTestSink countSink = new CollectorTestSink();
-		if (sum) {
-			oper.sum.setSink(sumSink);
-		}
-		if (count) {
-			oper.count.setSink(countSink);
-		}
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testNodeSchemaProcessing(boolean sum, boolean count)
+  {
+    SumCountMap<String, Double> oper = new SumCountMap<String, Double>();
+    oper.setType(Double.class);
+    CollectorTestSink sumSink = new CollectorTestSink();
+    CollectorTestSink countSink = new CollectorTestSink();
+    if (sum) {
+      oper.sum.setSink(sumSink);
+    }
+    if (count) {
+      oper.count.setSink(countSink);
+    }
 
-		oper.beginWindow(0); //
+    oper.beginWindow(0); //
 
-		HashMap<String, Double> input = new HashMap<String, Double>();
+    HashMap<String, Double> input = new HashMap<String, Double>();
 
-		input.put("a", 2.0);
-		input.put("b", 20.0);
-		input.put("c", 1000.0);
-		oper.data.process(input);
-		input.clear();
-		input.put("a", 1.0);
-		oper.data.process(input);
-		input.clear();
-		input.put("a", 10.0);
-		input.put("b", 5.0);
-		oper.data.process(input);
-		input.clear();
-		input.put("d", 55.0);
-		input.put("b", 12.0);
-		oper.data.process(input);
-		input.clear();
-		input.put("d", 22.0);
-		oper.data.process(input);
-		input.clear();
-		input.put("d", 14.2);
-		oper.data.process(input);
-		input.clear();
+    input.put("a", 2.0);
+    input.put("b", 20.0);
+    input.put("c", 1000.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 1.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 10.0);
+    input.put("b", 5.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("d", 55.0);
+    input.put("b", 12.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("d", 22.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("d", 14.2);
+    oper.data.process(input);
+    input.clear();
 
-		// Mix integers and doubles
-		HashMap<String, Double> inputi = new HashMap<String, Double>();
-		inputi.put("d", 46.0);
-		inputi.put("e", 2.0);
-		oper.data.process(inputi);
-		inputi.clear();
-		inputi.put("a", 23.0);
-		inputi.put("d", 4.0);
-		oper.data.process(inputi);
-		inputi.clear();
+    // Mix integers and doubles
+    HashMap<String, Double> inputi = new HashMap<String, Double>();
+    inputi.put("d", 46.0);
+    inputi.put("e", 2.0);
+    oper.data.process(inputi);
+    inputi.clear();
+    inputi.put("a", 23.0);
+    inputi.put("d", 4.0);
+    oper.data.process(inputi);
+    inputi.clear();
 
-		oper.endWindow(); //
+    oper.endWindow(); //
 
-		if (sum) {
-			// payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-			Assert.assertEquals("number emitted tuples", 1,
-					sumSink.collectedTuples.size());
-			for (Object o : sumSink.collectedTuples) {
-				HashMap<String, Object> output = (HashMap<String, Object>) o;
-				for (Map.Entry<String, Object> e : output.entrySet()) {
-					Double val = (Double) e.getValue();
-					if (e.getKey().equals("a")) {
-						Assert.assertEquals("emitted value for 'a' was ", new Double(36),
-								val);
-					} else if (e.getKey().equals("b")) {
-						Assert.assertEquals("emitted tuple for 'b' was ", new Double(37),
-								val);
-					} else if (e.getKey().equals("c")) {
-						Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000),
-								val);
-					} else if (e.getKey().equals("d")) {
-						Assert.assertEquals("emitted tuple for 'd' was ",
-								new Double(141.2), val);
-					} else if (e.getKey().equals("e")) {
-						Assert.assertEquals("emitted tuple for 'e' was ", new Double(2),
-								val);
-					}
-				}
-			}
-		}
-		if (count) {
-			// payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-			Assert.assertEquals("number emitted tuples", 1,
-					countSink.collectedTuples.size());
-			for (Object o : countSink.collectedTuples) {
-				HashMap<String, Object> output = (HashMap<String, Object>) o;
-				for (Map.Entry<String, Object> e : output.entrySet()) {
-					Integer val = (Integer) e.getValue();
-					if (e.getKey().equals("a")) {
-						Assert
-								.assertEquals("emitted value for 'a' was ", 4, val.intValue());
-					} else if (e.getKey().equals("b")) {
-						Assert
-								.assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
-					} else if (e.getKey().equals("c")) {
-						Assert
-								.assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
-					} else if (e.getKey().equals("d")) {
-						Assert
-								.assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
-					} else if (e.getKey().equals("e")) {
-						Assert
-								.assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
-					}
-				}
-			}
-		}
-	}
+    if (sum) {
+      // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+      Assert.assertEquals("number emitted tuples", 1, sumSink.collectedTuples.size());
+
+      for (Object o : sumSink.collectedTuples) {
+        HashMap<String, Object> output = (HashMap<String, Object>)o;
+        for (Map.Entry<String, Object> e : output.entrySet()) {
+          Double val = (Double)e.getValue();
+          if (e.getKey().equals("a")) {
+            Assert.assertEquals("emitted value for 'a' was ", new Double(36),
+                val);
+          } else if (e.getKey().equals("b")) {
+            Assert.assertEquals("emitted tuple for 'b' was ", new Double(37),
+                val);
+          } else if (e.getKey().equals("c")) {
+            Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000),
+                val);
+          } else if (e.getKey().equals("d")) {
+            Assert.assertEquals("emitted tuple for 'd' was ",
+                new Double(141.2), val);
+          } else if (e.getKey().equals("e")) {
+            Assert.assertEquals("emitted tuple for 'e' was ", new Double(2),
+                val);
+          }
+        }
+      }
+    }
+    if (count) {
+      // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+      Assert.assertEquals("number emitted tuples", 1, countSink.collectedTuples.size());
+      for (Object o : countSink.collectedTuples) {
+        HashMap<String, Object> output = (HashMap<String, Object>)o;
+        for (Map.Entry<String, Object> e : output.entrySet()) {
+          Integer val = (Integer)e.getValue();
+          if (e.getKey().equals("a")) {
+            Assert
+                .assertEquals("emitted value for 'a' was ", 4, val.intValue());
+          } else if (e.getKey().equals("b")) {
+            Assert
+                .assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
+          } else if (e.getKey().equals("c")) {
+            Assert
+                .assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
+          } else if (e.getKey().equals("d")) {
+            Assert
+                .assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
+          } else if (e.getKey().equals("e")) {
+            Assert
+                .assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/SumKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SumKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/SumKeyValTest.java
index 94e4806..336ddcb 100644
--- a/library/src/test/java/com/datatorrent/lib/math/SumKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/SumKeyValTest.java
@@ -33,7 +33,7 @@ public class SumKeyValTest
    * Test operator logic emits correct results.
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
-	@Test
+  @Test
   public void testNodeProcessing()
   {
     SumKeyVal<String, Double> oper = new SumKeyVal<String, Double>();
@@ -67,17 +67,13 @@ public class SumKeyValTest
       Double val = (Double)e.getValue();
       if (e.getKey().equals("a")) {
         Assert.assertEquals("emitted value for 'a' was ", new Double(36), val);
-      }
-      else if (e.getKey().equals("b")) {
+      } else if (e.getKey().equals("b")) {
         Assert.assertEquals("emitted tuple for 'b' was ", new Double(37), val);
-      }
-      else if (e.getKey().equals("c")) {
+      } else if (e.getKey().equals("c")) {
         Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), val);
-      }
-      else if (e.getKey().equals("d")) {
+      } else if (e.getKey().equals("d")) {
         Assert.assertEquals("emitted tuple for 'd' was ", new Double(141.2), val);
-      }
-      else if (e.getKey().equals("e")) {
+      } else if (e.getKey().equals("e")) {
         Assert.assertEquals("emitted tuple for 'e' was ", new Double(2), val);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/SumTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SumTest.java b/library/src/test/java/com/datatorrent/lib/math/SumTest.java
index 447f56a..e9962f0 100644
--- a/library/src/test/java/com/datatorrent/lib/math/SumTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/SumTest.java
@@ -29,76 +29,76 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class SumTest
 {
-	/**
-	 * Test operator logic emits correct results.
-	 */
-	@Test
-	public void testNodeTypeProcessing()
-	{
-		Sum<Double> doper = new Sum<Double>();
-		Sum<Float> foper = new Sum<Float>();
-		Sum<Integer> ioper = new Sum<Integer>();
-		Sum<Long> loper = new Sum<Long>();
-		Sum<Short> soper = new Sum<Short>();
-		doper.setType(Double.class);
-		foper.setType(Float.class);
-		ioper.setType(Integer.class);
-		loper.setType(Long.class);
-		soper.setType(Short.class);
+  /**
+   * Test operator logic emits correct results.
+   */
+  @Test
+  public void testNodeTypeProcessing()
+  {
+    Sum<Double> doper = new Sum<Double>();
+    Sum<Float> foper = new Sum<Float>();
+    Sum<Integer> ioper = new Sum<Integer>();
+    Sum<Long> loper = new Sum<Long>();
+    Sum<Short> soper = new Sum<Short>();
+    doper.setType(Double.class);
+    foper.setType(Float.class);
+    ioper.setType(Integer.class);
+    loper.setType(Long.class);
+    soper.setType(Short.class);
 
-		testNodeSchemaProcessing(doper);
-		testNodeSchemaProcessing(foper);
-		testNodeSchemaProcessing(ioper);
-		testNodeSchemaProcessing(loper);
-		testNodeSchemaProcessing(soper);
-	}
+    testNodeSchemaProcessing(doper);
+    testNodeSchemaProcessing(foper);
+    testNodeSchemaProcessing(ioper);
+    testNodeSchemaProcessing(loper);
+    testNodeSchemaProcessing(soper);
+  }
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void testNodeSchemaProcessing(Sum oper)
-	{
-		CollectorTestSink sumSink = new CollectorTestSink();
-		oper.sum.setSink(sumSink);
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testNodeSchemaProcessing(Sum oper)
+  {
+    CollectorTestSink sumSink = new CollectorTestSink();
+    oper.sum.setSink(sumSink);
 
-		oper.beginWindow(0); //
+    oper.beginWindow(0); //
 
-		Double a = new Double(2.0);
-		Double b = new Double(20.0);
-		Double c = new Double(1000.0);
+    Double a = 2.0;
+    Double b = 20.0;
+    Double c = 1000.0;
 
-		oper.data.process(a);
-		oper.data.process(b);
-		oper.data.process(c);
+    oper.data.process(a);
+    oper.data.process(b);
+    oper.data.process(c);
 
-		a = 1.0;
-		oper.data.process(a);
-		a = 10.0;
-		oper.data.process(a);
-		b = 5.0;
-		oper.data.process(b);
+    a = 1.0;
+    oper.data.process(a);
+    a = 10.0;
+    oper.data.process(a);
+    b = 5.0;
+    oper.data.process(b);
 
-		b = 12.0;
-		oper.data.process(b);
-		c = 22.0;
-		oper.data.process(c);
-		c = 14.0;
-		oper.data.process(c);
+    b = 12.0;
+    oper.data.process(b);
+    c = 22.0;
+    oper.data.process(c);
+    c = 14.0;
+    oper.data.process(c);
 
-		a = 46.0;
-		oper.data.process(a);
-		b = 2.0;
-		oper.data.process(b);
-		a = 23.0;
-		oper.data.process(a);
+    a = 46.0;
+    oper.data.process(a);
+    b = 2.0;
+    oper.data.process(b);
+    a = 23.0;
+    oper.data.process(a);
 
-		oper.endWindow(); //
+    oper.endWindow(); //
 
-		// payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-		Assert.assertEquals("number emitted tuples", 1,
-				sumSink.collectedTuples.size());
-		for (Object o : sumSink.collectedTuples) { // sum is 1157
-			Double val = ((Number) o).doubleValue();
-			Assert
-					.assertEquals("emitted sum value was was ", new Double(1157.0), val);
-		}
-	}
+    // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+    Assert.assertEquals("number emitted tuples", 1,
+        sumSink.collectedTuples.size());
+    for (Object o : sumSink.collectedTuples) { // sum is 1157
+
+      Double val = ((Number)o).doubleValue();
+      Assert.assertEquals("emitted sum value was was ", new Double(1157.0), val);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProductTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProductTest.java b/library/src/test/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProductTest.java
index 50ba311..dae96e4 100644
--- a/library/src/test/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProductTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProductTest.java
@@ -112,7 +112,8 @@ public class XmlKeyValueStringCartesianProductTest
     Assert.assertEquals("Output 4", "g=vg1,k=vk1", collectedTuples.get(7));
   }
 
-  List<String> testOperator(String xml, String config) {
+  List<String> testOperator(String xml, String config)
+  {
     XmlKeyValueStringCartesianProduct operator = new XmlKeyValueStringCartesianProduct();
     operator.setConfig(config);
     operator.setup(null);
@@ -123,7 +124,7 @@ public class XmlKeyValueStringCartesianProductTest
       public void put(Object o)
       {
         if (o instanceof String) {
-          collectedTuples.add((String) o);
+          collectedTuples.add((String)o);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyValTest.java b/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyValTest.java
index e5ffea8..cad0e8c 100644
--- a/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyValTest.java
@@ -31,37 +31,37 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 public class MultiWindowRangeKeyValTest
 {
-	private static Logger log = LoggerFactory.getLogger(MultiWindowRangeKeyValTest.class);
+  private static Logger log = LoggerFactory.getLogger(MultiWindowRangeKeyValTest.class);
 
-	/**
-	 * Test functional logic
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+  /**
+   * Test functional logic
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
-	public void testNodeProcessing() throws InterruptedException
-	{
-		MultiWindowRangeKeyVal<String, Integer> oper = new MultiWindowRangeKeyVal<String, Integer>();
+  public void testNodeProcessing() throws InterruptedException
+  {
+    MultiWindowRangeKeyVal<String, Integer> oper = new MultiWindowRangeKeyVal<String, Integer>();
 
-		CollectorTestSink swinSink = new CollectorTestSink();
-		oper.range.setSink(swinSink);
+    CollectorTestSink swinSink = new CollectorTestSink();
+    oper.range.setSink(swinSink);
 
-		oper.beginWindow(0);
-		KeyValPair<String, Integer> low = new KeyValPair<String, Integer>("a", 3);
-		oper.data.process(low);
-		KeyValPair<String, Integer> high = new KeyValPair<String, Integer>("a", 11);
-		oper.data.process(high);
-		oper.endWindow();
+    oper.beginWindow(0);
+    KeyValPair<String, Integer> low = new KeyValPair<String, Integer>("a", 3);
+    oper.data.process(low);
+    KeyValPair<String, Integer> high = new KeyValPair<String, Integer>("a", 11);
+    oper.data.process(high);
+    oper.endWindow();
 
-		oper.beginWindow(1);
-		low = new KeyValPair<String, Integer>("a", 1);
-		oper.data.process(low);
-		high = new KeyValPair<String, Integer>("a", 9);
-		oper.data.process(high);
-		oper.endWindow();
+    oper.beginWindow(1);
+    low = new KeyValPair<String, Integer>("a", 1);
+    oper.data.process(low);
+    high = new KeyValPair<String, Integer>("a", 9);
+    oper.data.process(high);
+    oper.endWindow();
 
-		Assert.assertEquals("number emitted tuples", 1, swinSink.collectedTuples.size());
-		for (Object o : swinSink.collectedTuples) {
-			log.debug(o.toString());
-		}
-	}
+    Assert.assertEquals("number emitted tuples", 1, swinSink.collectedTuples.size());
+    for (Object o : swinSink.collectedTuples) {
+      log.debug(o.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyValTest.java b/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyValTest.java
index 5573faf..0f9ccbd 100644
--- a/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyValTest.java
@@ -32,36 +32,36 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 public class MultiWindowSumKeyValTest
 {
-	private static Logger log = LoggerFactory.getLogger(MultiWindowSumKeyValTest.class);
-	/**
-	 * Test functional logic
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+  private static Logger log = LoggerFactory.getLogger(MultiWindowSumKeyValTest.class);
+  /**
+   * Test functional logic
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
-	public void testNodeProcessing() throws InterruptedException
-	{
-		MultiWindowSumKeyVal<String, Integer> oper = new MultiWindowSumKeyVal<String, Integer>();
+  public void testNodeProcessing() throws InterruptedException
+  {
+    MultiWindowSumKeyVal<String, Integer> oper = new MultiWindowSumKeyVal<String, Integer>();
 
-		CollectorTestSink swinSink = new CollectorTestSink();
-		oper.sum.setSink(swinSink);
+    CollectorTestSink swinSink = new CollectorTestSink();
+    oper.sum.setSink(swinSink);
 
-		oper.beginWindow(0);
-		KeyValPair<String, Integer> low = new KeyValPair<String, Integer>("a", 3);
-		oper.data.process(low);
-		KeyValPair<String, Integer> high = new KeyValPair<String, Integer>("a", 11);
-		oper.data.process(high);
-		oper.endWindow();
+    oper.beginWindow(0);
+    KeyValPair<String, Integer> low = new KeyValPair<String, Integer>("a", 3);
+    oper.data.process(low);
+    KeyValPair<String, Integer> high = new KeyValPair<String, Integer>("a", 11);
+    oper.data.process(high);
+    oper.endWindow();
 
-		oper.beginWindow(1);
-		low = new KeyValPair<String, Integer>("a", 1);
-		oper.data.process(low);
-		high = new KeyValPair<String, Integer>("a", 9);
-		oper.data.process(high);
-		oper.endWindow();
+    oper.beginWindow(1);
+    low = new KeyValPair<String, Integer>("a", 1);
+    oper.data.process(low);
+    high = new KeyValPair<String, Integer>("a", 9);
+    oper.data.process(high);
+    oper.endWindow();
 
-		Assert.assertEquals("number emitted tuples", 1, swinSink.collectedTuples.size());
-		for (Object o : swinSink.collectedTuples) {
-			log.debug(o.toString());
-		}
-	}
+    Assert.assertEquals("number emitted tuples", 1, swinSink.collectedTuples.size());
+    for (Object o : swinSink.collectedTuples) {
+      log.debug(o.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/multiwindow/SimpleMovingAverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/multiwindow/SimpleMovingAverageTest.java b/library/src/test/java/com/datatorrent/lib/multiwindow/SimpleMovingAverageTest.java
index 75be4bb..0d6a006 100644
--- a/library/src/test/java/com/datatorrent/lib/multiwindow/SimpleMovingAverageTest.java
+++ b/library/src/test/java/com/datatorrent/lib/multiwindow/SimpleMovingAverageTest.java
@@ -54,11 +54,10 @@ public class SimpleMovingAverageTest
     oper.endWindow();
     Assert.assertEquals("number emitted tuples", 2, sink.collectedTuples.size());
     for (int i = 0; i < 2; i++) {
-      KeyValPair<String, Double> pair = (KeyValPair<String, Double>) sink.collectedTuples.get(i);
+      KeyValPair<String, Double> pair = (KeyValPair<String, Double>)sink.collectedTuples.get(i);
       if (pair.getKey().equals("a")) {
         Assert.assertEquals("a SMA", 31.5, pair.getValue(), 0);
-      }
-      else {
+      } else {
         Assert.assertEquals("b SMA", 52.5, pair.getValue(), 0);
       }
     }
@@ -71,11 +70,10 @@ public class SimpleMovingAverageTest
     oper.endWindow();
     Assert.assertEquals("number emitted tuples", 4, sink.collectedTuples.size());
     for (int i = 2; i < 4; i++) {
-      KeyValPair<String, Double> pair = (KeyValPair<String, Double>) sink.collectedTuples.get(i);
+      KeyValPair<String, Double> pair = (KeyValPair<String, Double>)sink.collectedTuples.get(i);
       if (pair.getKey().equals("a")) {
         Assert.assertEquals("a SMA", 32.5, pair.getValue(), 0);
-      }
-      else {
+      } else {
         Assert.assertEquals("b SMA", 53.5, pair.getValue(), 0);
       }
     }
@@ -88,11 +86,10 @@ public class SimpleMovingAverageTest
     oper.endWindow();
     Assert.assertEquals("number emitted tuples", 6, sink.collectedTuples.size());
     for (int i = 4; i < 6; i++) {
-      KeyValPair<String, Double> pair = (KeyValPair<String, Double>) sink.collectedTuples.get(i);
+      KeyValPair<String, Double> pair = (KeyValPair<String, Double>)sink.collectedTuples.get(i);
       if (pair.getKey().equals("a")) {
         Assert.assertEquals("a SMA", 33.5, pair.getValue(), 0);
-      }
-      else {
+      } else {
         Assert.assertEquals("b SMA", 54.5, pair.getValue(), 0);
       }
     }
@@ -105,11 +102,10 @@ public class SimpleMovingAverageTest
     oper.endWindow();
     Assert.assertEquals("number emitted tuples", 8, sink.collectedTuples.size());
     for (int i = 6; i < 8; i++) {
-      KeyValPair<String, Double> pair = (KeyValPair<String, Double>) sink.collectedTuples.get(i);
+      KeyValPair<String, Double> pair = (KeyValPair<String, Double>)sink.collectedTuples.get(i);
       if (pair.getKey().equals("a")) {
         Assert.assertEquals("a SMA", 35.5, pair.getValue(), 0);
-      }
-      else {
+      } else {
         Assert.assertEquals("b SMA", 56.5, pair.getValue(), 0);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/multiwindow/SlidingWindowTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/multiwindow/SlidingWindowTest.java b/library/src/test/java/com/datatorrent/lib/multiwindow/SlidingWindowTest.java
index 02fce04..1a59b97 100644
--- a/library/src/test/java/com/datatorrent/lib/multiwindow/SlidingWindowTest.java
+++ b/library/src/test/java/com/datatorrent/lib/multiwindow/SlidingWindowTest.java
@@ -37,23 +37,24 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
 public class SlidingWindowTest
 {
 
-	public class TestSlidingWindow extends AbstractSlidingWindow<String, List<String>>
-	{
-		public final transient DefaultOutputPort<ArrayList<String>> out = new DefaultOutputPort<ArrayList<String>>();
+  public class TestSlidingWindow extends AbstractSlidingWindow<String, List<String>>
+  {
+    public final transient DefaultOutputPort<ArrayList<String>> out = new DefaultOutputPort<ArrayList<String>>();
 
-		ArrayList<String> tuples = new ArrayList<String>();
+    ArrayList<String> tuples = new ArrayList<String>();
 
-		@Override protected void processDataTuple(String tuple)
-		{
-			tuples.add(tuple);
-		}
+    @Override
+    protected void processDataTuple(String tuple)
+    {
+      tuples.add(tuple);
+    }
 
-		@Override
-		public void endWindow()
-		{
-			out.emit(tuples);
-			tuples = new ArrayList<String>();
-		}
+    @Override
+    public void endWindow()
+    {
+      out.emit(tuples);
+      tuples = new ArrayList<String>();
+    }
 
     @Override
     public List<String> createWindowState()
@@ -61,47 +62,46 @@ public class SlidingWindowTest
       return tuples;
     }
 
-	};
+  }
 
-	/**
-	 * Test functional logic
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+  /**
+   * Test functional logic
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
-	public void testNodeProcessing() throws InterruptedException
-	{
-	  TestSlidingWindow oper = new TestSlidingWindow();
-
-		CollectorTestSink swinSink = new CollectorTestSink();
-		oper.out.setSink(swinSink);
-		oper.setWindowSize(3);
-		oper.setup(null);
-
-		oper.beginWindow(0);
-		oper.data.process("a0");
-		oper.data.process("b0");
-		oper.endWindow();
-
-		oper.beginWindow(1);
-		oper.data.process("a1");
-		oper.data.process("b1");
-		oper.endWindow();
-
-		oper.beginWindow(2);
-		oper.data.process("a2");
-		oper.data.process("b2");
-		oper.endWindow();
-
-		oper.beginWindow(3);
-		oper.data.process("a3");
-		oper.data.process("b3");
-		oper.endWindow();
-
-		Assert.assertEquals("number emitted tuples", 4,
-      swinSink.collectedTuples.size());
-		
-		Assert.assertEquals("Invalid second stream window state.", oper.getStreamingWindowState(1), Lists.newArrayList("a2", "b2"));
-		Assert.assertEquals("Invalid expired stream window state.", oper.lastExpiredWindowState, Lists.newArrayList("a0", "b0"));
-
-	}
+  public void testNodeProcessing() throws InterruptedException
+  {
+    TestSlidingWindow oper = new TestSlidingWindow();
+
+    CollectorTestSink swinSink = new CollectorTestSink();
+    oper.out.setSink(swinSink);
+    oper.setWindowSize(3);
+    oper.setup(null);
+
+    oper.beginWindow(0);
+    oper.data.process("a0");
+    oper.data.process("b0");
+    oper.endWindow();
+
+    oper.beginWindow(1);
+    oper.data.process("a1");
+    oper.data.process("b1");
+    oper.endWindow();
+
+    oper.beginWindow(2);
+    oper.data.process("a2");
+    oper.data.process("b2");
+    oper.endWindow();
+
+    oper.beginWindow(3);
+    oper.data.process("a3");
+    oper.data.process("b3");
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 4, swinSink.collectedTuples.size());
+
+    Assert.assertEquals("Invalid second stream window state.", oper.getStreamingWindowState(1), Lists.newArrayList("a2", "b2"));
+    Assert.assertEquals("Invalid expired stream window state.", oper.lastExpiredWindowState, Lists.newArrayList("a0", "b0"));
+
+  }
 }