You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by br...@apache.org on 2015/11/19 21:35:06 UTC

[3/4] incubator-apex-malhar git commit: MLHR-1912 #resolve #comment fixed style violations

MLHR-1912 #resolve #comment fixed style violations


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b9aa203d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b9aa203d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b9aa203d

Branch: refs/heads/devel-3
Commit: b9aa203d7c411181665362dc0cee7420c7d61291
Parents: 2fe4bec
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Nov 19 08:30:41 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 19 11:16:17 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/lib/algo/UniqueValueCount.java  |  13 +-
 .../lib/codec/KryoSerializableStreamCodec.java  |   4 +-
 .../AbstractDBLookupCacheBackedOperator.java    |  16 ++-
 .../datatorrent/lib/db/cache/CacheManager.java  |  27 +++--
 .../datatorrent/lib/db/cache/CacheStore.java    |   5 +-
 .../lib/db/jdbc/AbstractJdbcInputOperator.java  |  15 +--
 ...stractJdbcTransactionableOutputOperator.java |  20 ++--
 .../db/jdbc/JDBCLookupCacheBackedOperator.java  |  15 +--
 .../lib/db/jdbc/JdbcPOJOInputOperator.java      |  55 ++++++---
 .../lib/db/jdbc/JdbcPOJOOutputOperator.java     |  98 +++++++++------
 .../com/datatorrent/lib/db/jdbc/JdbcStore.java  |  14 +--
 .../lib/db/jdbc/JdbcTransactionalStore.java     |  36 +++---
 .../lib/io/AbstractFTPInputOperator.java        |  14 +--
 .../lib/io/fs/AbstractFileOutputOperator.java   |  23 ++--
 .../lib/io/jms/AbstractJMSInputOperator.java    |  85 +++++++------
 .../lib/io/jms/JMSStringInputOperator.java      |  10 +-
 .../datatorrent/lib/metric/AvgAggregator.java   |   3 +-
 .../lib/metric/max/DoubleMaxAggregator.java     |   3 +-
 .../lib/metric/max/FloatMaxAggregator.java      |   3 +-
 .../lib/metric/max/IntMaxAggregator.java        |   3 +-
 .../lib/metric/max/LongMaxAggregator.java       |   1 -
 .../lib/metric/min/DoubleMinAggregator.java     |   3 +-
 .../lib/metric/min/FloatMinAggregator.java      |   3 +-
 .../lib/metric/min/IntMinAggregator.java        |   3 +-
 .../lib/metric/min/LongMinAggregator.java       |   3 +-
 .../lib/codec/KryoStreamCodecTest.java          | 120 ++++++++++---------
 .../lib/db/cache/CacheManagerTest.java          |   2 +-
 .../jdbc/JDBCLookupCacheBackedOperatorTest.java |  33 ++---
 .../lib/db/jdbc/JdbcOperatorTest.java           |  43 ++++---
 .../datatorrent/lib/db/jdbc/JdbcStoreTest.java  |   7 +-
 .../lib/io/FTPStringInputOperatorTest.java      |   7 +-
 .../io/fs/AbstractFileOutputOperatorTest.java   |   4 +-
 .../lib/io/jms/JMSStringInputOperatorTest.java  |  39 +++---
 pom.xml                                         |   2 +-
 34 files changed, 388 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
index 3d5ed3b..6f8750d 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java
@@ -24,15 +24,13 @@ import java.util.Set;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.datatorrent.common.util.BaseOperator;
-
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * This operator counts the number of unique values corresponding to a key within a window.&nbsp;
@@ -89,20 +87,21 @@ public class UniqueValueCount<K> extends BaseOperator
     @SuppressWarnings({"rawtypes", "unchecked"})
     public Unifier<KeyValPair<K, Integer>> getUnifier()
     {
-      return (Unifier) new UniqueCountUnifier<K>();
+      return (Unifier)new UniqueCountUnifier<K>();
     }
   };
 
   /**
    * The output port which emits key and set containing unique values
    */
-  public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues = new DefaultOutputPort<KeyValPair<K, Set<Object>>>()
+  public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues =
+      new DefaultOutputPort<KeyValPair<K, Set<Object>>>()
   {
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public Unifier<KeyValPair<K, Set<Object>>> getUnifier()
     {
-      return (Unifier) new UniqueCountSetUnifier<K>();
+      return (Unifier)new UniqueCountSetUnifier<K>();
     }
   };
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
index ecfa422..99e0a6e 100644
--- a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
+++ b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java
@@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Preconditions;
+
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.netlet.util.Slice;
 
@@ -56,7 +57,8 @@ public class KryoSerializableStreamCodec<T> implements StreamCodec<T>, Serializa
   }
 
   /**
-   * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is more efficient.
+   * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is
+   * more efficient.
    *
    * @param clazz class to register with Kryo.
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
index 1510adf..42e02d3 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java
@@ -29,7 +29,6 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
-
 import com.datatorrent.lib.db.Connectable;
 import com.datatorrent.lib.util.KeyValPair;
 
@@ -53,7 +52,8 @@ import com.datatorrent.lib.util.KeyValPair;
  * @param <S> type of store
  * @since 0.9.1
  */
-public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable> implements Operator, CacheManager.Backup
+public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable>
+    implements Operator, CacheManager.Backup
 {
   @NotNull
   protected S store;
@@ -83,11 +83,11 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
     Object value = cacheManager.get(key);
 
     if (value != null) {
-      output.emit(new KeyValPair<Object, Object>(key, value));
+      output.emit(new KeyValPair<>(key, value));
     }
   }
 
-  public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<KeyValPair<Object, Object>>();
+  public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<>();
 
   @Override
   public void beginWindow(long l)
@@ -107,8 +107,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
     cacheManager.setBackup(this);
     try {
       cacheManager.initialize();
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
@@ -118,8 +117,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
   {
     try {
       cacheManager.close();
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       LOG.error("closing manager", e);
     }
   }
@@ -171,6 +169,6 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab
    */
   protected abstract Object getKeyFromTuple(T tuple);
 
-  private final static Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
index 6d9f89d..2ea31f1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java
@@ -20,7 +20,12 @@ package com.datatorrent.lib.db.cache;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.*;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -36,15 +41,15 @@ import com.datatorrent.lib.db.KeyValueStore;
 
 /**
  * Manages primary and secondary stores.<br/>
- * <p>
- * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup store and retrieves the value.<br/>
+ * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup
+ * store and retrieves the value.<br/>
  * If the key was present in the backup store, its value is returned and also saved in the primary store.
- * </p>
- * <p>
- * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like databases.<br/>
- * Store Manager can also refresh the values of keys at a specified time every day. This time is in format HH:mm:ss Z.<br/>
+ * <p/>
+ * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like
+ * databases.<br/>
+ * Store Manager can also refresh the values of keys at a specified time every day.
+ * This time is in format HH:mm:ss Z.<br/>
  * This is not thread-safe.
- * </p>
  *
  * @since 0.9.2
  */
@@ -180,7 +185,7 @@ public class CacheManager implements Closeable
   /**
    * A primary store should also provide setting the value for a key.
    */
-  public static interface Primary extends KeyValueStore
+  public interface Primary extends KeyValueStore
   {
 
     /**
@@ -195,7 +200,7 @@ public class CacheManager implements Closeable
    * Backup store is queried when {@link Primary} doesn't contain a key.<br/>
    * It also provides data needed at startup.<br/>
    */
-  public static interface Backup extends KeyValueStore
+  public interface Backup extends KeyValueStore
   {
     /**
      * <br>Backup stores are also used to initialize primary stores. This fetches initialization data.</br>
@@ -206,6 +211,6 @@ public class CacheManager implements Closeable
   }
 
   @SuppressWarnings("unused")
-  private final static Logger LOG = LoggerFactory.getLogger(CacheManager.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
index d10fe13..72063ee 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
@@ -114,8 +114,7 @@ public class CacheStore implements CacheManager.Primary
     CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
     if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_ACCESS) {
       cacheBuilder.expireAfterAccess(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
-    }
-    else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
+    } else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) {
       cacheBuilder.expireAfterWrite(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS);
     }
     cache = cacheBuilder.build();
@@ -186,7 +185,7 @@ public class CacheStore implements CacheManager.Primary
   /**
    * Strategies for time-based expiration of entries.
    */
-  public static enum ExpiryType
+  public enum ExpiryType
   {
     /**
      * Only expire the entries after the specified duration has passed since the entry was last accessed by a read

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
index 1cb7635..fe6b077 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.db.AbstractStoreInputOperator;
  * and emits the data as tuples.&nbsp;
  * Subclasses should implement the methods required to read the data from the database.
  * <p>
- * This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}.
+ * This is an abstract class. Sub-classes need to implement
+ * {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}.
  * </p>
  * @displayName Abstract JDBC Input
  * @category Input
@@ -84,17 +85,14 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe
           outputPort.emit(tuple);
         }
         while (result.next());
-      }
-      else {
+      } else {
         // No rows available wait for some time before retrying so as to not continuously slam the database
         Thread.sleep(waitForDataTimeout);
       }
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       store.disconnect();
       throw new RuntimeException(String.format("Error while running query: %s", query), ex);
-    }
-    catch (InterruptedException ex) {
+    } catch (InterruptedException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -106,8 +104,7 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe
     super.setup(context);
     try {
       queryStatement = store.getConnection().createStatement();
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException("creating query", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index b57a9b1..77b76c1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -22,8 +22,8 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.List;
 
-import javax.annotation.Nonnull;
 import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 
 import com.datatorrent.api.Context;
-
 import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
 
 /**
@@ -40,7 +39,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
  * <p>
  * This operator creates a transaction at the start of window, executes batches of sql updates,
  * and closes the transaction at the end of the window. Each tuple corresponds to an SQL update statement.
- * The operator groups the updates in a batch and submits them with one call to the database. Batch processing improves performance considerably.<br/>
+ * The operator groups the updates in a batch and submits them with one call to the database. Batch processing
+ * improves performance considerably.<br/>
  * The size of a batch is configured by batchSize property.
  * </p>
  * <p>
@@ -55,7 +55,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
  * @param <T> type of tuple
  * @since 0.9.4
  */
-public abstract class AbstractJdbcTransactionableOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
+public abstract class AbstractJdbcTransactionableOutputOperator<T>
+    extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
 {
   protected static int DEFAULT_BATCH_SIZE = 1000;
 
@@ -80,8 +81,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
     super.setup(context);
     try {
       updateCommand = store.connection.prepareStatement(getUpdateCommand());
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
 
@@ -117,11 +117,9 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
       }
       updateCommand.executeBatch();
       updateCommand.clearBatch();
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException("processing batch", e);
-    }
-    finally {
+    } finally {
       batchStartIdx += tuples.size() - batchStartIdx;
     }
   }
@@ -142,7 +140,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr
    *
    * @return the sql statement to update a tuple in the database.
    */
-  @Nonnull
+  @NotNull
   protected abstract String getUpdateCommand();
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
index 93006eb..7ca2ae1 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java
@@ -78,8 +78,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
     try {
       putStatement = store.connection.prepareStatement(insertQuery);
       getStatement = store.connection.prepareStatement(getQuery);
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -90,8 +89,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
     try {
       preparePutStatement(putStatement, key, value);
       putStatement.executeUpdate();
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException("while executing insert", e);
     }
   }
@@ -103,8 +101,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
       prepareGetStatement(getStatement, key);
       ResultSet resultSet = getStatement.executeQuery();
       return processResultSet(resultSet);
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException("while fetching key", e);
     }
   }
@@ -118,8 +115,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
         prepareGetStatement(getStatement, key);
         ResultSet resultSet = getStatement.executeQuery();
         values.add(processResultSet(resultSet));
-      }
-      catch (SQLException e) {
+      } catch (SQLException e) {
         throw new RuntimeException("while fetching keys", e);
       }
     }
@@ -128,7 +124,8 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC
 
   protected abstract void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException;
 
-  protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException;
+  protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value)
+  throws SQLException;
 
   protected abstract String fetchInsertQuery();
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
index eafff3c..3aa6fac 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -19,7 +19,14 @@
 package com.datatorrent.lib.db.jdbc;
 
 import java.math.BigDecimal;
-import java.sql.*;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.List;
 import java.util.Map;
 
@@ -37,7 +44,6 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
 import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.lib.util.PojoUtils;
 
@@ -49,8 +55,8 @@ import com.datatorrent.lib.util.PojoUtils;
  *
  * For eg. user can set the query property to a complex one : "select x1, x2 from t1, t2 where t1.x3 = t2.x3 ;"<br/>
  *
- * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is most efficient
- * when the tables/views are indexed and the query uses this information to retrieve data.<br/>
+ * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is
+ * most efficient when the tables/views are indexed and the query uses this information to retrieve data.<br/>
  * This can be achieved in sub-classes by overriding {@link #queryToRetrieveData()} and {@link #setRuntimeParams()}.
  *
  * @displayName Jdbc Input Operator
@@ -58,7 +64,8 @@ import com.datatorrent.lib.util.PojoUtils;
  * @tags database, sql, pojo, jdbc
  * @since 2.1.0
  */
-public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> implements Operator.ActivationListener<Context.OperatorContext>
+public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
+    implements Operator.ActivationListener<Context.OperatorContext>
 {
   private static int DEF_FETCH_SIZE = 100;
 
@@ -363,53 +370,65 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> imp
       switch (type) {
         case (Types.CHAR):
         case (Types.VARCHAR):
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(),
             String.class);
           break;
 
         case (Types.BOOLEAN):
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.TINYINT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.SMALLINT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.INTEGER):
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.BIGINT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.FLOAT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.DOUBLE):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case Types.DECIMAL:
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
-            BigDecimal.class);
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+              BigDecimal.class);
           break;
 
         case Types.TIMESTAMP:
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case Types.TIME:
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case Types.DATE:
-          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         default:

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
index 5399c44..ffb4160 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
@@ -18,12 +18,32 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
 import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.lib.util.PojoUtils;
 import com.datatorrent.lib.util.PojoUtils.Getter;
@@ -34,18 +54,6 @@ import com.datatorrent.lib.util.PojoUtils.GetterInt;
 import com.datatorrent.lib.util.PojoUtils.GetterLong;
 import com.datatorrent.lib.util.PojoUtils.GetterShort;
 
-import java.math.BigDecimal;
-import java.sql.*;
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
 /**
  * <p>
  * JdbcPOJOOutputOperator class.</p>
@@ -57,7 +65,8 @@ import com.google.common.collect.Lists;
  * @since 2.1.0
  */
 @Evolving
-public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> implements Operator.ActivationListener<OperatorContext>
+public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+    implements Operator.ActivationListener<OperatorContext>
 {
   @NotNull
   private List<FieldInfo> fieldInfos;
@@ -165,51 +174,51 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
       switch (type) {
         case (Types.CHAR):
         case (Types.VARCHAR):
-          statement.setString(i + 1, ((Getter<Object, String>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case (Types.BOOLEAN):
-          statement.setBoolean(i + 1, ((GetterBoolean<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case (Types.TINYINT):
-          statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case (Types.SMALLINT):
-          statement.setShort(i + 1, ((GetterShort<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case (Types.INTEGER):
-          statement.setInt(i + 1, ((GetterInt<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case (Types.BIGINT):
-          statement.setLong(i + 1, ((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case (Types.FLOAT):
-          statement.setFloat(i + 1, ((GetterFloat<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case (Types.DOUBLE):
-          statement.setDouble(i + 1, ((GetterDouble<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case Types.DECIMAL:
-          statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>) activeFieldInfo.setterOrGetter).get(tuple));
+          statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case Types.TIMESTAMP:
-          statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+          statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
           break;
 
         case Types.TIME:
-          statement.setTime(i + 1, new Time(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+          statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
           break;
 
         case Types.DATE:
-          statement.setDate(i + 1, new Date(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+          statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
           break;
 
         default:
@@ -271,53 +280,64 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
       switch (type) {
         case (Types.CHAR):
         case (Types.VARCHAR):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(),
             String.class);
           break;
 
         case (Types.BOOLEAN):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.TINYINT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.SMALLINT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.INTEGER):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.BIGINT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.FLOAT):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case (Types.DOUBLE):
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case Types.DECIMAL:
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
-            BigDecimal.class);
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class);
           break;
 
         case Types.TIMESTAMP:
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case Types.TIME:
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         case Types.DATE:
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
 
         default:

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
index 2e284ad..d9901aa 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
@@ -32,8 +32,8 @@ import com.google.common.base.CharMatcher;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.db.Connectable;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * A {@link Connectable} that uses jdbc to connect to stores.
@@ -125,7 +125,8 @@ public class JdbcStore implements Connectable
    */
   public void setConnectionProperties(String connectionProps)
   {
-    String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults().split(connectionProps), String.class);
+    String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults()
+        .split(connectionProps), String.class);
     for (int i = 0; i < properties.length; i += 2) {
       if (i + 1 < properties.length) {
         connectionProperties.put(properties[i], properties[i + 1]);
@@ -163,8 +164,7 @@ public class JdbcStore implements Connectable
       connection = DriverManager.getConnection(databaseUrl, connectionProperties);
 
       logger.debug("JDBC connection Success");
-    }
-    catch (Throwable t) {
+    } catch (Throwable t) {
       DTThrowable.rethrow(t);
     }
   }
@@ -177,8 +177,7 @@ public class JdbcStore implements Connectable
   {
     try {
       connection.close();
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       throw new RuntimeException("closing database resource", ex);
     }
   }
@@ -188,8 +187,7 @@ public class JdbcStore implements Connectable
   {
     try {
       return !connection.isClosed();
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException("is isConnected", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
index e4a7229..9bc18b0 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java
@@ -24,10 +24,11 @@ import java.sql.SQLException;
 
 import javax.validation.constraints.NotNull;
 
-import com.datatorrent.lib.db.TransactionableStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.db.TransactionableStore;
+
 /**
  * <p>JdbcTransactionalStore class.</p>
  *
@@ -117,7 +118,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
     super.connect();
     try {
       String command = "select " + metaTableWindowColumn + " from " + metaTable + " where " + metaTableAppIdColumn +
-        " = ? and " + metaTableOperatorIdColumn + " = ?";
+          " = ? and " + metaTableOperatorIdColumn + " = ?";
       logger.debug(command);
       lastWindowFetchCommand = connection.prepareStatement(command);
 
@@ -126,18 +127,18 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
       logger.debug(command);
       lastWindowInsertCommand = connection.prepareStatement(command);
 
-      command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? " +
-        " and " + metaTableOperatorIdColumn + " = ?";
+      command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? "
+          + " and " + metaTableOperatorIdColumn + " = ?";
       logger.debug(command);
       lastWindowUpdateCommand = connection.prepareStatement(command);
 
-      command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn + " = ?";
+      command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn
+          + " = ?";
       logger.debug(command);
       lastWindowDeleteCommand = connection.prepareStatement(command);
 
       connection.setAutoCommit(false);
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -148,8 +149,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
     if (lastWindowUpdateCommand != null) {
       try {
         lastWindowUpdateCommand.close();
-      }
-      catch (SQLException e) {
+      } catch (SQLException e) {
         throw new RuntimeException(e);
       }
     }
@@ -168,8 +168,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
     try {
       connection.commit();
       inTransaction = false;
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -180,8 +179,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
     try {
       connection.rollback();
       inTransaction = false;
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -231,16 +229,14 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
       ResultSet resultSet = lastWindowFetchCommand.executeQuery();
       if (resultSet.next()) {
         lastWindow = resultSet.getLong(1);
-      }
-      else {
+      } else {
         lastWindowInsertCommand.setString(1, appId);
         lastWindowInsertCommand.setInt(2, operatorId);
         lastWindowInsertCommand.setLong(3, -1);
         lastWindowInsertCommand.executeUpdate();
       }
       return lastWindow;
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -253,8 +249,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
       lastWindowUpdateCommand.setString(2, appId);
       lastWindowUpdateCommand.setInt(3, operatorId);
       lastWindowUpdateCommand.executeUpdate();
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -266,8 +261,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable
       lastWindowDeleteCommand.setString(1, appId);
       lastWindowDeleteCommand.setInt(2, operatorId);
       lastWindowDeleteCommand.executeUpdate();
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
index 1cf57c5..d34f99b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java
@@ -27,15 +27,15 @@ import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.net.ftp.FTP;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ftp.FTPFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.DefaultOutputPort;
-
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
 
 /**
@@ -90,10 +90,10 @@ public abstract class AbstractFTPInputOperator<T> extends AbstractFileInputOpera
   {
     super.partitioned(partitions);
     for (Partition<AbstractFileInputOperator<T>> partition : partitions.values()) {
-      ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).host = host;
-      ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).port = port;
-      ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).userName = userName;
-      ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).password = password;
+      ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).host = host;
+      ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).port = port;
+      ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).userName = userName;
+      ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).password = password;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 2aa658f..15a208f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -217,7 +217,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   /**
    * File output counters.
    */
-  protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
+  protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<>(MutableLong.class);
 
   protected StreamCodec<INPUT> streamCodec;
 
@@ -252,8 +252,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    * accessed by a read or write.
    * <p/>
    * https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/>
-   * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort.
-   * Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.<br/>
+   * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a
+   * value expires, or anything of the sort. Instead, it performs small amounts of maintenance during write
+   * operations, or during occasional read operations if writes are rare.<br/>
    * This isn't the most effective way but adds a little bit of optimization.
    */
   private Long expireStreamAfterAcessMillis;
@@ -275,8 +276,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
     {
       if (AbstractFileOutputOperator.this.streamCodec == null) {
         return super.getStreamCodec();
-      }
-      else {
+      } else {
         return streamCodec;
       }
     }
@@ -414,8 +414,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
               LOG.debug("rotating file at setup.");
               rotate(seenFileName);
             }
-          }
-          catch (IOException | ExecutionException e) {
+          } catch (IOException | ExecutionException e) {
             throw new RuntimeException(e);
           }
         }
@@ -441,8 +440,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    * @param filename     name of the actual file.
    * @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the
    *                     latest open part file name.
-   * @param filepath     path of the file. When always writing to temp file, this is the path of the temp file; otherwise
-   *                     path of the actual file.
+   * @param filepath     path of the file. When always writing to temp file, this is the path of the temp file;
+   *                     otherwise path of the actual file.
    * @throws IOException
    */
   private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
@@ -714,7 +713,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
 
     //Close all the streams you can
     Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
-    for(String seenFileName: openStreams.keySet()) {
+    for (String seenFileName : openStreams.keySet()) {
       FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName);
       try {
         long start = System.currentTimeMillis();
@@ -1238,8 +1237,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
       LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName);
       rename(srcPath, destPath);
     } else if (fs.exists(srcPath)) {
-      //if the destination and src both exists that means there was a failure between file rename and clearing the endOffset so
-      //we just delete the tmp file.
+      /*if the destination and src both exists that means there was a failure between file rename and clearing the
+      endOffset so we just delete the tmp file*/
       LOG.debug("deleting tmp {}", tmpFileName);
       fs.delete(srcPath, true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index fe9e35f..43445a7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -25,14 +25,22 @@ import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
-import javax.jms.*;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
-import org.apache.commons.lang.mutable.MutableLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.mutable.MutableLong;
+
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -43,10 +51,9 @@ import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.counters.BasicCounters;
 import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * This is the base implementation of a JMS input operator.<br/>
@@ -71,8 +78,9 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
  * @since 0.3.2
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractJMSInputOperator<T> extends JMSBase implements InputOperator, ActivationListener<OperatorContext>,
-  MessageListener, ExceptionListener, Operator.IdleTimeHandler, Operator.CheckpointListener
+public abstract class AbstractJMSInputOperator<T> extends JMSBase
+    implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener,
+    Operator.IdleTimeHandler, Operator.CheckpointListener
 {
   protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k
 
@@ -102,8 +110,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
   protected transient long currentWindowId;
   protected transient int emitCount;
 
-  private transient final Set<String> pendingAck;
-  private transient final Lock lock;
+  private final transient Set<String> pendingAck;
+  private final transient Lock lock;
 
   public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
 
@@ -129,8 +137,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
         synchronized (lock) {
           try {
             return messageConsumed(message) && super.add(message);
-          }
-          catch (JMSException e) {
+          } catch (JMSException e) {
             LOG.error("message consumption", e);
             throwable.set(e);
             throw new RuntimeException(e);
@@ -162,10 +169,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
   {
     try {
       if (message.getJMSReplyTo() != null) { // Send reply only if the replyTo destination is set
-        replyProducer.send(message.getJMSReplyTo(), getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
+        replyProducer.send(message.getJMSReplyTo(),
+            getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
       }
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       LOG.error(ex.getLocalizedMessage());
       throwable.set(ex);
       throw new RuntimeException(ex);
@@ -199,8 +206,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
       if (operatorRecoveredWindows != null) {
         Arrays.sort(operatorRecoveredWindows);
       }
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException("fetching windows", e);
     }
   }
@@ -223,7 +229,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
     pendingAck.add(message.getJMSMessageID());
     MutableLong receivedCt = counters.getCounter(CounterKeys.RECEIVED);
     receivedCt.increment();
-    LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(), receivedCt.longValue());
+    LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(),
+        receivedCt.longValue());
     return true;
   }
 
@@ -239,11 +246,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
       replyProducer = getSession().createProducer(null);
 
       consumer = (isDurable() && isTopic()) ?
-        getSession().createDurableSubscriber((Topic) getDestination(), consumerName) :
-        getSession().createConsumer(getDestination());
+          getSession().createDurableSubscriber((Topic)getDestination(), consumerName) :
+          getSession().createConsumer(getDestination());
       consumer.setMessageListener(this);
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -264,7 +270,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
   {
     try {
       @SuppressWarnings("unchecked")
-      Map<String, T> recoveredData = (Map<String, T>) idempotentStorageManager.load(context.getId(), windowId);
+      Map<String, T> recoveredData = (Map<String, T>)idempotentStorageManager.load(context.getId(), windowId);
       if (recoveredData == null) {
         return;
       }
@@ -272,8 +278,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
         pendingAck.add(recoveredEntry.getKey());
         emit(recoveredEntry.getValue());
       }
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException("replay", e);
     }
   }
@@ -306,8 +311,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
         currentWindowRecoveryState.put(message.getJMSMessageID(), payload);
         emit(payload);
       }
-    }
-    catch (JMSException e) {
+    } catch (JMSException e) {
       throw new RuntimeException("processing msg", e);
     }
   }
@@ -320,12 +324,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
       /* nothing to do here, so sleep for a while to avoid busy loop */
       try {
         Thread.sleep(spinMillis);
-      }
-      catch (InterruptedException ie) {
+      } catch (InterruptedException ie) {
         throw new RuntimeException(ie);
       }
-    }
-    else {
+    } else {
       DTThrowable.rethrow(lthrowable);
     }
   }
@@ -338,7 +340,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
    * acknowledged have been persisted because they wouldn't be redelivered. Also if they are persisted then
    * they shouldn't be re-delivered because that would cause duplicates.<br/>
    *
-   * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is blocked.<br/>
+   * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is
+   * blocked.<br/>
    */
   @Override
   public void endWindow()
@@ -365,27 +368,24 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
           }
           ackCompleted = true;
           pendingAck.clear();
-        }
-        catch (Throwable t) {
+        } catch (Throwable t) {
           if (!ackCompleted) {
             LOG.info("confirm recovery of {} for {} does not exist", context.getId(), currentWindowId, t);
           }
           DTThrowable.rethrow(t);
-        }
-        finally {
+        } finally {
           if (stateSaved && !ackCompleted) {
             try {
               idempotentStorageManager.delete(context.getId(), currentWindowId);
-            }
-            catch (IOException e) {
+            } catch (IOException e) {
               LOG.error("unable to delete corrupted state", e);
             }
           }
         }
       }
       emitCount = 0; //reset emit count
-    }
-    else if (operatorRecoveredWindows != null && currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) {
+    } else if (operatorRecoveredWindows != null &&
+        currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) {
       //pendingAck is not cleared for the last replayed window of this operator. This is because there is
       //still a chance that in the previous run the operator crashed after saving the state but before acknowledgement.
       pendingAck.clear();
@@ -402,8 +402,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
   {
     if (isTransacted()) {
       getSession().commit();
-    }
-    else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) {
+    } else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) {
       lastMsg.acknowledge(); // acknowledge all consumed messages till now
     }
   }
@@ -418,8 +417,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
   {
     try {
       idempotentStorageManager.deleteUpTo(context.getId(), windowId);
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException("committing", e);
     }
   }
@@ -441,8 +439,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp
       consumer = null;
 
       super.cleanup();
-    }
-    catch (JMSException ex) {
+    } catch (JMSException ex) {
       throw new RuntimeException("at cleanup", ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
index bbc8d0f..a72d127 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java
@@ -37,12 +37,10 @@ public class JMSStringInputOperator extends AbstractJMSInputOperator<String>
   public String convert(Message message) throws JMSException
   {
     if (message instanceof TextMessage) {
-      return ((TextMessage) message).getText();
-    }
-    else if (message instanceof StreamMessage) {
-      return ((StreamMessage) message).readString();
-    }
-    else {
+      return ((TextMessage)message).getText();
+    } else if (message instanceof StreamMessage) {
+      return ((StreamMessage)message).readString();
+    } else {
       throw new IllegalArgumentException("Unhandled message type " + message.getClass().getName());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
index 33f7f8d..ab9a261 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -38,7 +37,7 @@ public class AvgAggregator implements SingleMetricAggregator, Serializable
     double sum = 0;
 
     for (Object value : metricValues) {
-      sum += ((Number) value).doubleValue();
+      sum += ((Number)value).doubleValue();
     }
     return sum / metricValues.size();
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
index 40d8c04..13988f9 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -37,7 +36,7 @@ public class DoubleMaxAggregator implements SingleMetricAggregator, Serializable
   {
     Double max = null;
     for (Object value : metricValues) {
-      double dval = ((Number) value).doubleValue();
+      double dval = ((Number)value).doubleValue();
       if (max == null || dval > max) {
         max = dval;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
index 68f8f2a..6b228e8 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -37,7 +36,7 @@ public class FloatMaxAggregator implements SingleMetricAggregator, Serializable
   {
     Float max = null;
     for (Object value : metricValues) {
-      float fval = ((Number) value).floatValue();
+      float fval = ((Number)value).floatValue();
       if (max == null || fval > max) {
         max = fval;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
index 71eaba0..2cd0264 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -37,7 +36,7 @@ public class IntMaxAggregator implements SingleMetricAggregator, Serializable
   {
     Integer max = null;
     for (Object value : metricValues) {
-      int ival = ((Number) value).intValue();
+      int ival = ((Number)value).intValue();
       if (max == null || ival > max) {
         max = ival;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
index 43a1abd..f0aab73 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
index d65e744..1b1b712 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -37,7 +36,7 @@ public class DoubleMinAggregator implements SingleMetricAggregator, Serializable
   {
     Double min = null;
     for (Object value : metricValues) {
-      double dval = ((Number) value).doubleValue();
+      double dval = ((Number)value).doubleValue();
       if (min == null || dval < min) {
         min = dval;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
index bbe0861..6713911 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -37,7 +36,7 @@ public class FloatMinAggregator implements SingleMetricAggregator, Serializable
   {
     Float min = null;
     for (Object metric : metricValues) {
-      float fval = ((Number) metric).floatValue();
+      float fval = ((Number)metric).floatValue();
       if (min == null || fval < min) {
         min = fval;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
index 76ffebc..72c610f 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -37,7 +36,7 @@ public class IntMinAggregator implements SingleMetricAggregator, Serializable
   {
     Integer min = null;
     for (Object value : metricValues) {
-      int ival = ((Number) value).intValue();
+      int ival = ((Number)value).intValue();
       if (min == null || ival < min) {
         min = ival;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
index 19a1dab..0deb808 100644
--- a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 
 import com.datatorrent.api.annotation.Name;
-
 import com.datatorrent.common.metric.SingleMetricAggregator;
 
 /**
@@ -37,7 +36,7 @@ public class LongMinAggregator implements SingleMetricAggregator, Serializable
   {
     Long min = null;
     for (Object value : metricValues) {
-      long lval = ((Number) value).longValue();
+      long lval = ((Number)value).longValue();
       if (min == null || lval < min) {
         min = lval;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
index 0d5c646..4affef7 100644
--- a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
+++ b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java
@@ -31,79 +31,89 @@ import com.datatorrent.netlet.util.Slice;
  * Tests for {@link KryoSerializableStreamCodec}
  *
  */
-public class KryoStreamCodecTest {
+public class KryoStreamCodecTest
+{
 
-    public static class TestTuple {
-        final Integer field;
+  public static class TestTuple
+  {
+    final Integer field;
 
-        @SuppressWarnings("unused")
-        private TestTuple(){
-            this.field= 0;
-        }
-
-        public TestTuple(Integer x){
-            this.field= Preconditions.checkNotNull(x,"x");
-        }
+    @SuppressWarnings("unused")
+    private TestTuple()
+    {
+      this.field = 0;
+    }
 
-        @Override
-        public boolean equals(Object o) {
-           return o.getClass()== this.getClass() && ((TestTuple)o).field.equals(this.field);
-        }
+    public TestTuple(Integer x)
+    {
+      this.field = Preconditions.checkNotNull(x, "x");
+    }
 
-        @Override
-        public int hashCode() {
-            return TestTuple.class.hashCode()^ this.field.hashCode();
-        }
+    @Override
+    public boolean equals(Object o)
+    {
+      return o.getClass() == this.getClass() && ((TestTuple)o).field.equals(this.field);
     }
 
-    public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple> {
+    @Override
+    public int hashCode()
+    {
+      return TestTuple.class.hashCode() ^ this.field.hashCode();
+    }
+  }
 
-        public TestKryoStreamCodec(){
-          super();
-        }
+  public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple>
+  {
 
-        @Override
-        public int getPartition(TestTuple testTuple) {
-            return testTuple.field;
-        }
+    public TestKryoStreamCodec()
+    {
+      super();
     }
 
-    @Test
-    public void testSomeMethod() throws IOException
+    @Override
+    public int getPartition(TestTuple testTuple)
     {
-        TestKryoStreamCodec coder = new TestKryoStreamCodec();
-        TestKryoStreamCodec decoder = new TestKryoStreamCodec();
+      return testTuple.field;
+    }
+  }
 
-        KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>();
-        Slice sliceOfObj = objCoder.toByteArray(10);
-        Integer decodedObj = (Integer) objCoder.fromByteArray(sliceOfObj);
+  @Test
+  public void testSomeMethod() throws IOException
+  {
+    TestKryoStreamCodec coder = new TestKryoStreamCodec();
+    TestKryoStreamCodec decoder = new TestKryoStreamCodec();
 
-        Assert.assertEquals("codec", decodedObj.intValue(), 10);
+    KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>();
+    Slice sliceOfObj = objCoder.toByteArray(10);
+    Integer decodedObj = (Integer)objCoder.fromByteArray(sliceOfObj);
 
-        TestTuple tp= new TestTuple(5);
+    Assert.assertEquals("codec", decodedObj.intValue(), 10);
 
-        Slice dsp1 = coder.toByteArray(tp);
-        Slice dsp2 = coder.toByteArray(tp);
-        Assert.assertEquals(dsp1, dsp2);
+    TestTuple tp = new TestTuple(5);
 
-        Object tcObject1 = decoder.fromByteArray(dsp1);
-        assert (tp.equals(tcObject1));
+    Slice dsp1 = coder.toByteArray(tp);
+    Slice dsp2 = coder.toByteArray(tp);
+    Assert.assertEquals(dsp1, dsp2);
 
-        Object tcObject2 = decoder.fromByteArray(dsp2);
-        assert (tp.equals(tcObject2));
+    Object tcObject1 = decoder.fromByteArray(dsp1);
+    assert (tp.equals(tcObject1));
 
-        dsp1 = coder.toByteArray(tp);
-        dsp2 = coder.toByteArray(tp);
-        Assert.assertEquals(dsp1, dsp2);
-    }
+    Object tcObject2 = decoder.fromByteArray(dsp2);
+    assert (tp.equals(tcObject2));
 
-    @Test
-    public void testFinalFieldSerialization() throws Exception{
-        TestTuple t1 = new TestTuple(5);
-        TestKryoStreamCodec codec= new TestKryoStreamCodec();
+    dsp1 = coder.toByteArray(tp);
+    dsp2 = coder.toByteArray(tp);
+    Assert.assertEquals(dsp1, dsp2);
+  }
 
-        Slice dsp = codec.toByteArray(t1);
-        TestTuple t2 = (TestTuple)codec.fromByteArray(dsp);
-        Assert.assertEquals("", t1.field, t2.field);
-    }
+  @Test
+  public void testFinalFieldSerialization() throws Exception
+  {
+    TestTuple t1 = new TestTuple(5);
+    TestKryoStreamCodec codec = new TestKryoStreamCodec();
+
+    Slice dsp = codec.toByteArray(t1);
+    TestTuple t2 = (TestTuple)codec.fromByteArray(dsp);
+    Assert.assertEquals("", t1.field, t2.field);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
index 9ee536e..caa02bf 100644
--- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java
@@ -62,7 +62,7 @@ public class CacheManagerTest
         @Override
         public boolean apply(@Nullable Object key)
         {
-          return key != null && (Integer) key <= 5;
+          return key != null && (Integer)key <= 5;
         }
       });
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
index 9078271..46f49e2 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java
@@ -18,7 +18,12 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.List;
@@ -36,7 +41,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Maps;
 
 import com.datatorrent.api.Context;
-
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -49,8 +53,9 @@ public class JDBCLookupCacheBackedOperatorTest
   private static final String INMEM_DB_DRIVER = org.hsqldb.jdbcDriver.class.getName();
   protected static final String TABLE_NAME = "Test_Lookup_Cache";
 
-  protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator = new TestJDBCLookupCacheBackedOperator();
-  protected static CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+  protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator =
+      new TestJDBCLookupCacheBackedOperator();
+  protected static CollectorTestSink<Object> sink = new CollectorTestSink<>();
   protected static final Map<Integer, String> mapping = Maps.newHashMap();
 
   static {
@@ -61,9 +66,10 @@ public class JDBCLookupCacheBackedOperatorTest
     mapping.put(5, "five");
   }
 
-  protected static transient final Logger logger = LoggerFactory.getLogger(JDBCLookupCacheBackedOperatorTest.class);
+  protected static final transient Logger logger = LoggerFactory.getLogger(
+      JDBCLookupCacheBackedOperatorTest.class);
 
-  private final static Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<List<Object>>();
+  private static final Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<>();
 
   public static class TestJDBCLookupCacheBackedOperator extends JDBCLookupCacheBackedOperator<String>
   {
@@ -83,15 +89,15 @@ public class JDBCLookupCacheBackedOperatorTest
     @Override
     protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException
     {
-      putStatement.setInt(1, (Integer) key);
-      putStatement.setString(2, (String) value);
+      putStatement.setInt(1, (Integer)key);
+      putStatement.setString(2, (String)value);
 
     }
 
     @Override
     protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException
     {
-      getStatement.setInt(1, (Integer) key);
+      getStatement.setInt(1, (Integer)key);
     }
 
     @Override
@@ -121,8 +127,7 @@ public class JDBCLookupCacheBackedOperatorTest
       List<Object> values = super.getAll(keys);
       try {
         bulkValuesExchanger.exchange(values);
-      }
-      catch (InterruptedException e) {
+      } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
       return values;
@@ -164,7 +169,7 @@ public class JDBCLookupCacheBackedOperatorTest
     Statement stmt = con.createStatement();
 
     String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
-      + " (col1 INTEGER, col2 VARCHAR(20))";
+        + " (col1 INTEGER, col2 VARCHAR(20))";
 
     stmt.executeUpdate(createTable);
     stmt.executeUpdate("Delete from " + TABLE_NAME);
@@ -172,8 +177,8 @@ public class JDBCLookupCacheBackedOperatorTest
     // populate the database
     for (Map.Entry<Integer, String> entry : mapping.entrySet()) {
       String insert = "INSERT INTO " + TABLE_NAME
-        + " (col1, col2) VALUES (" + entry.getKey() + ", '"
-        + entry.getValue() + "')";
+          + " (col1, col2) VALUES (" + entry.getKey() + ", '"
+          + entry.getValue() + "')";
       stmt.executeUpdate(insert);
     }