You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:41:57 UTC

[07/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
index d78df94..63c75ef 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
@@ -18,20 +18,33 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator.ProcessingMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.*;
 import com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.TestEvent;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.netlet.util.DTThrowable;
+
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.TABLE_NAME;
 import static com.datatorrent.lib.db.jdbc.JdbcOperatorTest.DB_DRIVER;
 import static com.datatorrent.lib.db.jdbc.JdbcOperatorTest.URL;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import java.sql.*;
-import java.util.Random;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Test for {@link AbstractJdbcNonTransactionableBatchOutputOperator}
@@ -54,17 +67,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
     try {
       Class.forName(DB_DRIVER).newInstance();
       con = DriverManager.getConnection(URL);
-    }
-    catch (SQLException ex) {
-      DTThrowable.rethrow(ex);
-    }
-    catch (ClassNotFoundException ex) {
-      DTThrowable.rethrow(ex);
-    }
-    catch (InstantiationException ex) {
-      DTThrowable.rethrow(ex);
-    }
-    catch (IllegalAccessException ex) {
+    } catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException ex) {
       DTThrowable.rethrow(ex);
     }
   }
@@ -77,8 +80,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
 
     try {
       con.close();
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       DTThrowable.rethrow(ex);
     }
   }
@@ -108,8 +110,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
         int count = resultSet.getInt(1);
         stmt.close();
         return count;
-      }
-      catch (SQLException e) {
+      } catch (SQLException e) {
         throw new RuntimeException("fetching count", e);
       }
     }
@@ -145,55 +146,40 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
 
     outputOperator.beginWindow(0);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(1);
 
-    for(int batchCounter = 0;
-        batchCounter < HALF_BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        1,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should not be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should not be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(2);
 
-    for(int batchCounter = 0;
-        batchCounter < HALF_BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
 
-    Assert.assertEquals("Commit window id ",
-                        2,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should not be written",
-                        2 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should not be written", 2 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.teardown();
   }
@@ -207,36 +193,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
 
     outputOperator.beginWindow(0);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(1);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        2 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.getStore().disconnect();
 
@@ -249,46 +225,33 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
     OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
     outputOperator.setup(context);
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        2* BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(0);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        2 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(1);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        1,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        3 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 3 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
   }
 
   @Test
@@ -300,36 +263,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
 
     outputOperator.beginWindow(0);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(1);
 
-    for(int batchCounter = 0;
-        batchCounter < HALF_BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.getStore().disconnect();
 
@@ -339,49 +292,38 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
     attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
     attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, 0L);
     attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
     outputOperator.setup(context);
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(0);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(1);
 
-    for(int batchCounter = 0;
-        batchCounter < HALF_BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        1,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        2 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
   }
 
   @Test
@@ -393,35 +335,25 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
 
     outputOperator.beginWindow(0);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(1);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        2 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.getStore().disconnect();
 
@@ -436,20 +368,15 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
 
     outputOperator.beginWindow(2);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        2,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        3 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 3 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
   }
 
   @Test
@@ -461,36 +388,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
 
     outputOperator.beginWindow(0);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(1);
 
-    for(int batchCounter = 0;
-        batchCounter < HALF_BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.getStore().disconnect();
 
@@ -503,28 +420,20 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
     OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
     outputOperator.setup(context);
 
-    Assert.assertEquals("Commit window id ",
-                        0,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
 
     outputOperator.beginWindow(2);
 
-    for(int batchCounter = 0;
-        batchCounter < BATCH_SIZE;
-        batchCounter++) {
+    for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
       outputOperator.input.put(new TestEvent(random.nextInt()));
     }
 
     outputOperator.endWindow();
 
-    Assert.assertEquals("Commit window id ",
-                        2,
-                        outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
-    Assert.assertEquals("Batch should be written",
-                        2 * BATCH_SIZE,
-                        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+    Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+    Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+        outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
index d539aaa..9880aae 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
@@ -32,10 +32,11 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 import com.datatorrent.api.DAG;
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.google.common.collect.Lists;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * Test for {@link AbstractJdbcNonTransactionableOutputOperator Operator}
@@ -70,8 +71,7 @@ public class JdbcNonTransactionalOutputOperatorTest
 
       String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)";
       stmt.executeUpdate(createTable);
-    }
-    catch (Throwable e) {
+    } catch (Throwable e) {
       DTThrowable.rethrow(e);
     }
   }
@@ -84,8 +84,7 @@ public class JdbcNonTransactionalOutputOperatorTest
 
       String cleanTable = "delete from " + TABLE_NAME;
       stmt.executeUpdate(cleanTable);
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -122,12 +121,11 @@ public class JdbcNonTransactionalOutputOperatorTest
         String countQuery = "SELECT * FROM " + TABLE_NAME;
         ResultSet resultSet = stmt.executeQuery(countQuery);
         int count = 0;
-        while(resultSet.next()) {
+        while (resultSet.next()) {
           count++;
         }
         return count;
-      }
-      catch (SQLException e) {
+      } catch (SQLException e) {
         throw new RuntimeException("fetching count", e);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
index 56359fb..ef8f9a0 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
@@ -32,8 +32,7 @@ public class JdbcNonTransactionalStoreTest
     JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
     try {
       jdbcNonTransactionalStore.beginTransaction();
-    }
-    catch(RuntimeException e) {
+    } catch (RuntimeException e) {
       return;
     }
     Assert.fail("Exception should be thrown");
@@ -45,8 +44,7 @@ public class JdbcNonTransactionalStoreTest
     JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
     try {
       jdbcNonTransactionalStore.commitTransaction();
-    }
-    catch(RuntimeException e) {
+    } catch (RuntimeException e) {
       return;
     }
     Assert.fail("Exception should be thrown");
@@ -58,8 +56,7 @@ public class JdbcNonTransactionalStoreTest
     JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
     try {
       jdbcNonTransactionalStore.rollbackTransaction();
-    }
-    catch(RuntimeException e) {
+    } catch (RuntimeException e) {
       return;
     }
     Assert.fail("Exception should be thrown");
@@ -71,8 +68,7 @@ public class JdbcNonTransactionalStoreTest
     JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
     try {
       jdbcNonTransactionalStore.isInTransaction();
-    }
-    catch(RuntimeException e) {
+    } catch (RuntimeException e) {
       return;
     }
     Assert.fail("Exception should be thrown");

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
index d9daf97..98be88c 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
@@ -25,18 +25,22 @@ import java.io.PrintStream;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.commons.io.FileUtils;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+
+import com.google.common.collect.Lists;
 
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 import com.datatorrent.lib.util.TestUtils.TestInfo;
-import com.google.common.collect.Lists;
 
 public class JsonFormatterTest
 {
@@ -150,7 +154,7 @@ public class JsonFormatterTest
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
     String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}";
-    System.out.println(validDataSink.collectedTuples.get(0));
+    LOG.debug("{}", validDataSink.collectedTuples.get(0));
     Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
   }
 
@@ -199,4 +203,6 @@ public class JsonFormatterTest
   {
   }
 
+  private static final Logger LOG = LoggerFactory.getLogger(JsonFormatterTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
index 50ed3bd..bb51ca4 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
@@ -33,12 +33,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
-import com.datatorrent.lib.parser.XmlParser;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 
@@ -76,6 +77,7 @@ public class XmlFormatterTest
     }
 
   }
+
   @Test
   public void testOperatorSerialization()
   {
@@ -162,7 +164,7 @@ public class XmlFormatterTest
 
     operator.setup(null);
     operator.in.process(e);
-    System.out.println(validDataSink.collectedTuples.get(0));
+    LOG.debug("{}", validDataSink.collectedTuples.get(0));
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
     String expected = "<EmployeeBean>" + "<name>john</name>"
@@ -202,7 +204,7 @@ public class XmlFormatterTest
 
   }
 
-  @XmlType (propOrder={"name","dept","eid", "dateOfJoining", "address"})
+  @XmlType(propOrder = {"name", "dept", "eid", "dateOfJoining", "address"})
   public static class EmployeeBean
   {
 
@@ -292,4 +294,6 @@ public class XmlFormatterTest
 
   }
 
+  private static final Logger LOG = LoggerFactory.getLogger(XmlFormatterTest.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
index d8138d5..2ece6b2 100644
--- a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
+++ b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
@@ -34,7 +34,7 @@ import com.datatorrent.api.Context.OperatorContext;
  */
 public class OperatorContextTestHelper
 {
-  private final static ThreadLocal<DateFormat> DATE_FORMAT_THREAD_LOCAL = new ThreadLocal<DateFormat>()
+  private static final ThreadLocal<DateFormat> DATE_FORMAT_THREAD_LOCAL = new ThreadLocal<DateFormat>()
   {
     @Override
     protected DateFormat initialValue()

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java b/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
index 2cc0e1d..9d501aa 100644
--- a/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
+++ b/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
@@ -65,11 +65,9 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet
           if (topic != null) {
             subscriber = this;
           }
-        }
-        else if (type.equals("unsubscribe")) {
+        } else if (type.equals("unsubscribe")) {
           subscriber = null;
-        }
-        else if (type.equals("publish")) {
+        } else if (type.equals("publish")) {
           Object data = map.get("data");
           if (data != null) {
             if (subscriber != null) {
@@ -77,8 +75,7 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet
             }
           }
         }
-      }
-      catch (Exception ex) {
+      } catch (Exception ex) {
         LOG.warn("Data read error", ex);
       }
     }
@@ -109,8 +106,7 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet
     map.put("data", data);
     try {
       webSocket.connection.sendMessage(mapper.writeValueAsString(map));
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       LOG.warn("Connection send error", ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java b/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
index e619ff8..a2c021e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.lib.io;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.lib.io.ApacheGenRandomLogs;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
 /**
@@ -29,37 +28,39 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  */
 public class ApacheRandomLogsTest
 {
-	@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+  @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
   @Test
-	public void test()
-	{
-		ApacheGenRandomLogs oper = new ApacheGenRandomLogs();
-		CollectorTestSink sink = new CollectorTestSink();
-		oper.outport.setSink(sink);
-		oper.setup(null);
+  public void test()
+  {
+    ApacheGenRandomLogs oper = new ApacheGenRandomLogs();
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+    oper.setup(null);
 
-		Thread t = new EmitTuples(oper);
-		t.start();
-		try
-		{
-			Thread.sleep(1000);
-		} catch (InterruptedException e)
-		{
-		}
-		t.stop();
-		Assert.assertTrue("Tuples emitted", sink.collectedTuples.size() > 0);
-	}
+    Thread t = new EmitTuples(oper);
+    t.start();
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      //Fixme
+    }
+    t.stop();
+    Assert.assertTrue("Tuples emitted", sink.collectedTuples.size() > 0);
+  }
 
-	private class EmitTuples extends Thread {
-		private ApacheGenRandomLogs oper;
-		public EmitTuples(ApacheGenRandomLogs oper)
-		{
-			this.oper = oper;
-		}
-		@Override
-		public void run()
-		{
-			oper.emitTuples();
-		}
-	}
+  private class EmitTuples extends Thread
+  {
+    private ApacheGenRandomLogs oper;
+
+    public EmitTuples(ApacheGenRandomLogs oper)
+    {
+      this.oper = oper;
+    }
+
+    @Override
+    public void run()
+    {
+      oper.emitTuples();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
index ada1148..959e25e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
@@ -29,7 +29,6 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.io.IOUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.eclipse.jetty.server.Handler;
@@ -39,6 +38,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.commons.io.IOUtils;
+
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
 /**
@@ -74,8 +75,7 @@ public class HttpJsonChunksInputOperatorTest
           response.getOutputStream().println();
           response.getOutputStream().println(0);
           response.getOutputStream().flush();
-        }
-        catch (JSONException e) {
+        } catch (JSONException e) {
           response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error generating response: " + e.toString());
         }
 
@@ -88,8 +88,6 @@ public class HttpJsonChunksInputOperatorTest
     server.start();
 
     String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext";
-    System.out.println(url);
-
     final AbstractHttpInputOperator operator = new HttpJsonChunksInputOperator();
 
     CollectorTestSink sink = new CollectorTestSink();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
index 10ec6c2..be405ab 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
@@ -28,7 +28,6 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.io.IOUtils;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
@@ -36,6 +35,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.commons.io.IOUtils;
+
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 
@@ -79,7 +80,6 @@ public class HttpLinesInputOperatorTest
     server.start();
 
     String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext";
-    System.out.println(url);
 
     final HttpLinesInputOperator operator = new HttpLinesInputOperator();
     CollectorTestSink<String> sink = TestUtils.setSink(operator.outputPort, new CollectorTestSink<String>());

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
index b321cfa..927317e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
@@ -28,9 +28,6 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MultivaluedMap;
 
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
@@ -38,6 +35,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
index eb69bdf..afe518b 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
@@ -31,7 +31,6 @@ import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.core.MediaType;
 
-import org.apache.commons.io.IOUtils;
 import org.codehaus.jettison.json.JSONObject;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Request;
@@ -40,6 +39,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.commons.io.IOUtils;
+
 /**
  * Functional test for {@link com.datatorrent.lib.io.HttpPostOutputOperator}.
  */
@@ -75,8 +76,6 @@ public class HttpPostOutputOperatorTest
     server.start();
 
     String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext";
-    System.out.println("url: " + url);
-
 
     HttpPostOutputOperator<Object> node = new HttpPostOutputOperator<Object>();
     node.setUrl(url);
@@ -95,7 +94,6 @@ public class HttpPostOutputOperatorTest
     }
 
     Assert.assertEquals("number requests", 1, receivedMessages.size());
-    System.out.println(receivedMessages.get(0));
     JSONObject json = new JSONObject(data);
     Assert.assertTrue("request body " + receivedMessages.get(0), receivedMessages.get(0).contains(json.toString()));
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
index 4b29830..acb3fc4 100644
--- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
@@ -79,8 +79,7 @@ public class IdempotentStorageManagerTest
       storageManager.teardown();
       try {
         FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }
@@ -105,7 +104,7 @@ public class IdempotentStorageManagerTest
     testMeta.storageManager.save(data, 1, 1);
     testMeta.storageManager.setup(testMeta.context);
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageManager.load(1, 1);
+    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
     Assert.assertEquals("dataOf1", data, decoded);
   }
 
@@ -130,8 +129,7 @@ public class IdempotentStorageManagerTest
     for (Integer operatorId : decodedStates.keySet()) {
       if (operatorId == 1) {
         Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
-      }
-      else {
+      } else {
         Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
       }
     }
@@ -182,8 +180,7 @@ public class IdempotentStorageManagerTest
     testMeta.storageManager.save(dataOf2, 2, 1);
     testMeta.storageManager.save(dataOf3, 3, 1);
 
-    testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager),
-      Sets.newHashSet(2, 3));
+    testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager), Sets.newHashSet(2, 3));
     testMeta.storageManager.setup(testMeta.context);
     testMeta.storageManager.deleteUpTo(1, 6);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
index 43f9186..7801619 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
@@ -34,8 +34,7 @@ public abstract class PubSubWebSocketAppDataOperatorTest
   public static final URI GATEWAY_CONNECT_ADDRESS;
   public static final URI URI_ADDRESS;
 
-  static
-  {
+  static {
     try {
       GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub");
       URI_ADDRESS = new URI(URI_ADDRESS_STRING);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
index 3dc5be3..fc92429 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
@@ -22,13 +22,11 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
-
 import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
 
 public class PubSubWebSocketAppDataQueryTest extends PubSubWebSocketAppDataOperatorTest
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
index 402bb34..e165649 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
@@ -50,7 +50,7 @@ public class PubSubWebSocketOperatorTest
     contextHandler.addServlet(sh, "/pubsub");
     contextHandler.addServlet(sh, "/*");
     server.start();
-    Connector connector[] = server.getConnectors();
+    Connector[] connector = server.getConnectors();
     URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
 
     PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
@@ -100,10 +100,10 @@ public class PubSubWebSocketOperatorTest
     Assert.assertTrue("tuples emitted", sink.collectedTuples.size() > 1);
 
     @SuppressWarnings("unchecked")
-    Map<String, String> tuple = (Map<String, String>) sink.collectedTuples.get(0);
+    Map<String, String> tuple = (Map<String, String>)sink.collectedTuples.get(0);
     Assert.assertEquals("Expects {\"hello\":\"world\"} as data", "world", tuple.get("hello"));
 
-    String stringResult = (String) sink.collectedTuples.get(1);
+    String stringResult = (String)sink.collectedTuples.get(1);
     Assert.assertEquals("Expects {\"hello\":\"world\"} as data", stringData, stringResult);
 
     inputOperator.deactivate();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
index b4a649a..6bd839d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
@@ -26,20 +26,22 @@ import javax.mail.Message;
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeMessage;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
+import org.apache.hadoop.conf.Configuration;
+
 import com.google.common.collect.Maps;
 import com.icegreen.greenmail.util.GreenMail;
 import com.icegreen.greenmail.util.ServerSetup;
 import com.icegreen.greenmail.util.ServerSetupTest;
 
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
 public class SmtpOutputOperatorTest
 {
 
@@ -97,7 +99,7 @@ public class SmtpOutputOperatorTest
     String expectedContent = content.replace("{}", data.toString()).trim();
 
     Assert.assertTrue(expectedContent.equals(receivedContent));
-    Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
+    Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
     Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
     Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.TO)[1].toString());
     Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.CC)[0].toString());
@@ -121,7 +123,7 @@ public class SmtpOutputOperatorTest
     String expectedContent = content.replace("{}", data.toString()).trim();
 
     Assert.assertTrue(expectedContent.equals(receivedContent));
-    Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
+    Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
     Assert.assertEquals(to, messages[0].getAllRecipients()[0].toString());
   }
 
@@ -139,7 +141,8 @@ public class SmtpOutputOperatorTest
     conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.CC", cc);
 
     final AtomicReference<SmtpOutputOperator> o1 = new AtomicReference<SmtpOutputOperator>();
-    StreamingApplication app = new StreamingApplication() {
+    StreamingApplication app = new StreamingApplication()
+    {
       @Override
       public void populateDAG(DAG dag, Configuration conf)
       {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
index 79d780d..a79ace7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
@@ -87,9 +87,8 @@ public class SocketInputOperatorTest
           reader.close();
           clientChannel.close();
         }
-      }
-      catch (Exception e) {
-        // LOG.debug("server ", e);
+      } catch (Exception e) {
+        //fixme
       }
     }
   }
@@ -118,16 +117,15 @@ public class SocketInputOperatorTest
       operator.endWindow();
       operator.deactivate();
       operator.teardown();
-      String outputString = (String) sink.collectedTuples.get(0);
+      String outputString = (String)sink.collectedTuples.get(0);
       Assert.assertEquals(strBuffer.substring(0, outputString.length()), sink.collectedTuples.get(0));
       int length = outputString.length();
-      outputString = (String) sink.collectedTuples.get(1);
+      outputString = (String)sink.collectedTuples.get(1);
       Assert.assertEquals(strBuffer.substring(length, length + outputString.length()), sink.collectedTuples.get(1));
       server.interrupt();
       server.join();
       Thread.sleep(1000);
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.debug("exception", e);
     }
   }
@@ -161,15 +159,14 @@ public class SocketInputOperatorTest
       int endIndex = 0;
       int start = 0;
       for (int i = 0; i < 10; i++) {
-        endIndex += ((String) sink.collectedTuples.get(i)).length();
+        endIndex += ((String)sink.collectedTuples.get(i)).length();
         Assert.assertEquals(strBuffer.substring(start, endIndex), sink.collectedTuples.get(i));
         start = endIndex;
       }
       server.interrupt();
       server.join();
       Thread.sleep(1000);
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.debug("exception", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
index 184a6bd..5ce5276 100644
--- a/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
@@ -19,19 +19,18 @@
 package com.datatorrent.lib.io;
 
 import java.net.URI;
-
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
-
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketClient;
 import org.eclipse.jetty.websocket.WebSocketClientFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class WebSocketServerInputOperatorTest
 {
   @Test
@@ -57,11 +56,10 @@ public class WebSocketServerInputOperatorTest
 
     long startTime = System.currentTimeMillis();
 
-    while(startTime + 10000 > System.currentTimeMillis()) {
-      if(TestWSSIO.messages.size() >= 1) {
+    while (startTime + 10000 > System.currentTimeMillis()) {
+      if (TestWSSIO.messages.size() >= 1) {
         break;
       }
-
       Thread.sleep(100);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
index b50fe29..d11125b 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
@@ -18,23 +18,35 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import com.datatorrent.api.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashSet;
 
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 import com.datatorrent.lib.util.TestUtils.TestInfo;
-import com.google.common.collect.*;
-import java.io.*;
-import java.util.*;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.*;
-import org.junit.*;
 
 public class AbstractFileInputOperatorFailureHandlingTest
 {
-  @Rule public TestInfo testMeta = new TestInfo();
+  @Rule
+  public TestInfo testMeta = new TestInfo();
 
   public static class TestFileInputOperator extends AbstractFileInputOperator<String>
   {
@@ -60,7 +72,8 @@ public class AbstractFileInputOperatorFailureHandlingTest
       br = null;
     }
 
-    @Override protected InputStream retryFailedFile(FailedFile ff) throws IOException
+    @Override
+    protected InputStream retryFailedFile(FailedFile ff) throws IOException
     {
       count = 0;
       return super.retryFailedFile(ff);
@@ -90,13 +103,13 @@ public class AbstractFileInputOperatorFailureHandlingTest
     FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.getDir()).getAbsolutePath()), true);
     HashSet<String> allLines = Sets.newHashSet();
     // Create files with 100 records.
-    for (int file=0; file<10; file++) {
+    for (int file = 0; file < 10; file++) {
       HashSet<String> lines = Sets.newHashSet();
-      for (int line=0; line<10; line++) {
-        lines.add("f"+file+"l"+line);
+      for (int line = 0; line < 10; line++) {
+        lines.add("f" + file + "l" + line);
       }
       allLines.addAll(lines);
-      FileUtils.write(new File(testMeta.getDir(), "file"+file), StringUtils.join(lines, '\n'));
+      FileUtils.write(new File(testMeta.getDir(), "file" + file), StringUtils.join(lines, '\n'));
     }
 
     Thread.sleep(10);
@@ -104,15 +117,16 @@ public class AbstractFileInputOperatorFailureHandlingTest
     TestFileInputOperator oper = new TestFileInputOperator();
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.getDir());
     oper.getScanner().setFilePatternRegexp(".*file[\\d]");
 
-    oper.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()));
-    for (long wid=0; wid<1000; wid++) {
+    oper.setup(
+        new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()));
+    for (long wid = 0; wid < 1000; wid++) {
       oper.beginWindow(wid);
       oper.emitTuples();
       oper.endWindow();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 3a8661c..ea16185 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -86,34 +86,35 @@ public class AbstractFileInputOperatorTest
     }
   }
 
-  @Rule public TestMeta testMeta = new TestMeta();
+  @Rule
+  public TestMeta testMeta = new TestMeta();
   
   @Test
   public void testSinglePartiton() throws Exception
   {
     FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
     HashSet<String> allLines = Sets.newHashSet();
-    for (int file=0; file<2; file++) {
+    for (int file = 0; file < 2; file++) {
       HashSet<String> lines = Sets.newHashSet();
-      for (int line=0; line<2; line++) {
-        lines.add("f"+file+"l"+line);
+      for (int line = 0; line < 2; line++) {
+        lines.add("f" + file + "l" + line);
       }
       allLines.addAll(lines);
-      FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n'));
+      FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
     oper.getScanner().setFilePatternRegexp(".*file[\\d]");
 
     oper.setup(testMeta.context);
-    for (long wid=0; wid<3; wid++) {
+    for (long wid = 0; wid < 3; wid++) {
       oper.beginWindow(wid);
       oper.emitTuples();
       oper.endWindow();
@@ -133,8 +134,8 @@ public class AbstractFileInputOperatorTest
 
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
     FileContext.getLocalFSFileContext().delete(path, true);
-    for (int file=0; file<4; file++) {
-      FileUtils.write(new File(testMeta.dir, "partition00"+file), "");
+    for (int file = 0; file < 4; file++) {
+      FileUtils.write(new File(testMeta.dir, "partition00" + file), "");
     }
 
     FileSystem fs = FileSystem.get(FileContext.getLocalFSFileContext().getDefaultFileSystem().getUri(), new Configuration());
@@ -158,13 +159,14 @@ public class AbstractFileInputOperatorTest
 
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
     FileContext.getLocalFSFileContext().delete(path, true);
-    for (int file=0; file<4; file++) {
-      FileUtils.write(new File(testMeta.dir, "partition00"+file), "");
+    for (int file = 0; file < 4; file++) {
+      FileUtils.write(new File(testMeta.dir, "partition00" + file), "");
     }
 
     List<Partition<AbstractFileInputOperator<String>>> partitions = Lists.newArrayList();
     partitions.add(new DefaultPartition<AbstractFileInputOperator<String>>(oper));
-    Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = oper.definePartitions(partitions, new PartitioningContextImpl(null, 2));
+    Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = oper.definePartitions(partitions,
+        new PartitioningContextImpl(null, 2));
     Assert.assertEquals(2, newPartitions.size());
     Assert.assertEquals(1, oper.getCurrentPartitions()); // partitioned() wasn't called
 
@@ -202,20 +204,20 @@ public class AbstractFileInputOperatorTest
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
     FileContext.getLocalFSFileContext().delete(path, true);
     int file;
-    for (file=0; file<4; file++) {
-      FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+    for (file = 0; file < 4; file++) {
+      FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
     }
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     int wid = 0;
 
     // Read all records to populate processedList in operator.
     oper.setup(testMeta.context);
-    for(int i = 0; i < 10; i++) {
+    for (int i = 0; i < 10; i++) {
       oper.beginWindow(wid);
       oper.emitTuples();
       oper.endWindow();
@@ -233,7 +235,7 @@ public class AbstractFileInputOperatorTest
     partitions.add(new DefaultPartition<AbstractFileInputOperator<String>>(oper));
     // incremental capacity controlled partitionCount property
     Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = initialState.definePartitions(partitions,
-      new PartitioningContextImpl(null, 0));
+        new PartitioningContextImpl(null, 0));
     Assert.assertEquals(2, newPartitions.size());
     Assert.assertEquals(1, initialState.getCurrentPartitions());
     Map<Integer, Partition<AbstractFileInputOperator<String>>> m = Maps.newHashMap();
@@ -253,8 +255,8 @@ public class AbstractFileInputOperatorTest
     }
 
     sink.clear();
-    for(int i = 0; i < 10; i++) {
-      for(AbstractFileInputOperator<String> o : opers) {
+    for (int i = 0; i < 10; i++) {
+      for (AbstractFileInputOperator<String> o : opers) {
         o.beginWindow(wid);
         o.emitTuples();
         o.endWindow();
@@ -266,12 +268,12 @@ public class AbstractFileInputOperatorTest
     Assert.assertEquals("No new tuples read ", 0, sink.collectedTuples.size());
 
     // Add four new files with 3 records each.
-    for (; file<8; file++) {
-      FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+    for (; file < 8; file++) {
+      FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
     }
 
-    for(int i = 0; i < 10; i++) {
-      for(AbstractFileInputOperator<String> o : opers) {
+    for (int i = 0; i < 10; i++) {
+      for (AbstractFileInputOperator<String> o : opers) {
         o.beginWindow(wid);
         o.emitTuples();
         o.endWindow();
@@ -306,20 +308,20 @@ public class AbstractFileInputOperatorTest
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
     FileContext.getLocalFSFileContext().delete(path, true);
     int file;
-    for (file=0; file<4; file++) {
-      FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+    for (file = 0; file < 4; file++) {
+      FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
     }
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     int wid = 0;
 
     //Read some records
     oper.setup(testMeta.context);
-    for(int i = 0; i < 5; i++) {
+    for (int i = 0; i < 5; i++) {
       oper.beginWindow(wid);
       oper.emitTuples();
       oper.endWindow();
@@ -357,8 +359,8 @@ public class AbstractFileInputOperatorTest
     }
 
     sink.clear();
-    for(int i = 0; i < 10; i++) {
-      for(AbstractFileInputOperator<String> o : opers) {
+    for (int i = 0; i < 10; i++) {
+      for (AbstractFileInputOperator<String> o : opers) {
         o.beginWindow(wid);
         o.emitTuples();
         o.endWindow();
@@ -391,20 +393,20 @@ public class AbstractFileInputOperatorTest
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
     FileContext.getLocalFSFileContext().delete(path, true);
     int file;
-    for (file=0; file<4; file++) {
-      FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+    for (file = 0; file < 4; file++) {
+      FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
     }
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     int wid = 0;
 
     //Read some records
     oper.setup(testMeta.context);
-    for(int i = 0; i < 5; i++) {
+    for (int i = 0; i < 5; i++) {
       oper.beginWindow(wid);
       oper.emitTuples();
       oper.endWindow();
@@ -442,8 +444,8 @@ public class AbstractFileInputOperatorTest
     }
 
     sink.clear();
-    for(int i = 0; i < 10; i++) {
-      for(AbstractFileInputOperator<String> o : opers) {
+    for (int i = 0; i < 10; i++) {
+      for (AbstractFileInputOperator<String> o : opers) {
         o.beginWindow(wid);
         o.emitTuples();
         o.endWindow();
@@ -475,7 +477,7 @@ public class AbstractFileInputOperatorTest
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
@@ -510,7 +512,7 @@ public class AbstractFileInputOperatorTest
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({"unchecked", "rawtypes"})
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
@@ -545,7 +547,7 @@ public class AbstractFileInputOperatorTest
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({"unchecked", "rawtypes"})
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
@@ -581,7 +583,7 @@ public class AbstractFileInputOperatorTest
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({"unchecked", "rawtypes"})
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
@@ -713,7 +715,7 @@ public class AbstractFileInputOperatorTest
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({"unchecked", "rawtypes"})
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
@@ -775,7 +777,7 @@ public class AbstractFileInputOperatorTest
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({"unchecked", "rawtypes"})
-    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
@@ -837,7 +839,7 @@ public class AbstractFileInputOperatorTest
 
     List<TestStorageManager> storageManagers = Lists.newLinkedList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      storageManagers.add((TestStorageManager) p.getPartitionedInstance().idempotentStorageManager);
+      storageManagers.add((TestStorageManager)p.getPartitionedInstance().idempotentStorageManager);
     }
     Assert.assertEquals("count of storage managers", 2, storageManagers.size());