You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:07 UTC

[17/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
index 0bc4712..ec00fda 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/FieldsDescriptor.java
@@ -18,9 +18,7 @@
  */
 package com.datatorrent.lib.appdata.schemas;
 
-import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
 import java.io.Serializable;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -28,17 +26,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.datatorrent.lib.appdata.gpo.Serde;
 import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix;
 
+import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
+
 /**
  * This class manages the storage of fields in app data. It is used in {@link GPOMutable} objects
  * to map field names to values in order to respond to queries, it also serves as a schema which is
@@ -118,8 +118,7 @@ public class FieldsDescriptor implements Serializable
    * @param fieldToType A mapping from field names to the type of the field.
    * @param fieldToSerdeObject A mapping from field names to the corresponding serde object.
    */
-  public FieldsDescriptor(Map<String, Type> fieldToType,
-                          Map<String, Serde> fieldToSerdeObject)
+  public FieldsDescriptor(Map<String, Type> fieldToType, Map<String, Serde> fieldToSerdeObject)
   {
     setFieldToType(fieldToType);
     compressedTypes = Sets.newHashSet();
@@ -128,23 +127,17 @@ public class FieldsDescriptor implements Serializable
 
     List<String> fieldNames = typeToFields.get(Type.OBJECT);
 
-    if(fieldNames == null) {
-      throw new IllegalArgumentException("There are no fields of type " + Type.OBJECT +
-                                         " in this fieldsdescriptor");
-    }
-    else {
+    if (fieldNames == null) {
+      throw new IllegalArgumentException("There are no fields of type " + Type.OBJECT + " in this fieldsdescriptor");
+    } else {
       serdes = new Serde[fieldNames.size()];
 
       //Insert serdes in corresponding order
-      for(int index = 0;
-          index < fieldNames.size();
-          index++) {
+      for (int index = 0; index < fieldNames.size(); index++) {
         String fieldName = fieldNames.get(index);
         Serde serdeObject = fieldToSerdeObject.get(fieldName);
-        if(serdeObject == null) {
-          throw new IllegalArgumentException("The field "
-                                             + fieldName
-                                             + " doesn't have a serde object.");
+        if (serdeObject == null) {
+          throw new IllegalArgumentException("The field " + fieldName + " doesn't have a serde object.");
         }
 
         serdes[index] = serdeObject;
@@ -152,9 +145,8 @@ public class FieldsDescriptor implements Serializable
     }
   }
 
-  public FieldsDescriptor(Map<String, Type> fieldToType,
-                          Map<String, Serde> fieldToSerdeObject,
-                          SerdeObjectPayloadFix serdePayloadFix)
+  public FieldsDescriptor(Map<String, Type> fieldToType, Map<String, Serde> fieldToSerdeObject,
+      SerdeObjectPayloadFix serdePayloadFix)
   {
     this(fieldToType, fieldToSerdeObject);
     this.serdePayloadFix = serdePayloadFix;
@@ -185,13 +177,13 @@ public class FieldsDescriptor implements Serializable
     typeToFieldToIndex = Maps.newHashMap();
     typeToFields = Maps.newHashMap();
 
-    for(Map.Entry<String, Type> entry: fieldToType.entrySet()) {
+    for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
       String field = entry.getKey();
       Type type = entry.getValue();
 
       List<String> fieldsList = typeToFields.get(type);
 
-      if(fieldsList == null) {
+      if (fieldsList == null) {
         fieldsList = Lists.newArrayList();
         typeToFields.put(type, fieldsList);
       }
@@ -200,22 +192,19 @@ public class FieldsDescriptor implements Serializable
     }
 
     //ensure consistent ordering of fields
-    for(Map.Entry<Type, List<String>> entry: typeToFields.entrySet()) {
+    for (Map.Entry<Type, List<String>> entry : typeToFields.entrySet()) {
       Type type = entry.getKey();
       List<String> tempFields = entry.getValue();
 
       Collections.sort(tempFields);
       Object2IntLinkedOpenHashMap<String> fieldToIndex = new Object2IntLinkedOpenHashMap<String>();
 
-      for(int index = 0;
-          index < tempFields.size();
-          index++) {
+      for (int index = 0; index < tempFields.size(); index++) {
         String field = tempFields.get(index);
 
-        if(compressedTypes.contains(type)) {
+        if (compressedTypes.contains(type)) {
           fieldToIndex.put(field, 0);
-        }
-        else {
+        } else {
           fieldToIndex.put(field, index);
         }
       }
@@ -225,10 +214,9 @@ public class FieldsDescriptor implements Serializable
 
     //Types
 
-    if(!typeToFields.isEmpty()) {
+    if (!typeToFields.isEmpty()) {
       types = EnumSet.copyOf(typeToFields.keySet());
-    }
-    else {
+    } else {
       types = Sets.newHashSet();
     }
 
@@ -245,13 +233,12 @@ public class FieldsDescriptor implements Serializable
     //Array Sizes
     typeToSize = new Object2IntLinkedOpenHashMap<Type>();
 
-    for(Map.Entry<Type, List<String>> entry: typeToFields.entrySet()) {
+    for (Map.Entry<Type, List<String>> entry : typeToFields.entrySet()) {
       Type type = entry.getKey();
 
-      if(compressedTypes.contains(type)) {
+      if (compressedTypes.contains(type)) {
         getTypeToSize().put(type, 1);
-      }
-      else {
+      } else {
         getTypeToSize().put(type, entry.getValue().size());
       }
     }
@@ -272,8 +259,9 @@ public class FieldsDescriptor implements Serializable
    * @param fieldToType The field to type map to set for this {@link FieldsDescriptor}
    * object.
    */
-  private void setFieldToType(Map<String, Type> fieldToType) {
-    for(Map.Entry<String, Type> entry: fieldToType.entrySet()) {
+  private void setFieldToType(Map<String, Type> fieldToType)
+  {
+    for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
       Preconditions.checkNotNull(entry.getKey());
       Preconditions.checkNotNull(entry.getValue());
     }
@@ -287,7 +275,7 @@ public class FieldsDescriptor implements Serializable
    */
   private void setCompressedTypes(Set<Type> compressedTypes)
   {
-    for(Type type: compressedTypes) {
+    for (Type type : compressedTypes) {
       Preconditions.checkNotNull(type);
     }
 
@@ -319,7 +307,7 @@ public class FieldsDescriptor implements Serializable
    */
   public Fields getFields()
   {
-    if(fields == null) {
+    if (fields == null) {
       fields = new Fields(fieldToType.keySet());
     }
 
@@ -336,7 +324,7 @@ public class FieldsDescriptor implements Serializable
   {
     Map<String, Type> newFieldToType = Maps.newHashMap();
 
-    for(String field: fields.getFields()) {
+    for (String field : fields.getFields()) {
       Type type = fieldToType.get(field);
       newFieldToType.put(field, type);
     }
@@ -426,17 +414,19 @@ public class FieldsDescriptor implements Serializable
   @Override
   public boolean equals(Object obj)
   {
-    if(obj == null) {
+    if (obj == null) {
       return false;
     }
-    if(getClass() != obj.getClass()) {
+    if (getClass() != obj.getClass()) {
       return false;
     }
     final FieldsDescriptor other = (FieldsDescriptor)obj;
-    if(this.fieldToType != other.fieldToType && (this.fieldToType == null || !this.fieldToType.equals(other.fieldToType))) {
+    if (this.fieldToType != other.fieldToType && (this.fieldToType == null || !this.fieldToType.equals(
+        other.fieldToType))) {
       return false;
     }
-    if(this.compressedTypes != other.compressedTypes && (this.compressedTypes == null || !this.compressedTypes.equals(other.compressedTypes))) {
+    if (this.compressedTypes != other.compressedTypes && (this.compressedTypes == null || !this.compressedTypes.equals(
+        other.compressedTypes))) {
       return false;
     }
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
index 6d97e9a..cbeaf2e 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/QRBase.java
@@ -80,8 +80,7 @@ public abstract class QRBase extends Message
    * @param id The query id.
    * @param type The type of the query.
    */
-  public QRBase(String id,
-                String type)
+  public QRBase(String id, String type)
   {
     super(type);
     this.id = Preconditions.checkNotNull(id);
@@ -93,9 +92,7 @@ public abstract class QRBase extends Message
    * @param type The type of the query.
    * @param countdown The countdown for the query.
    */
-  public QRBase(String id,
-                String type,
-                long countdown)
+  public QRBase(String id, String type, long countdown)
   {
     this(id, type);
     setCountdown(countdown);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
index 835c599..fc88886 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Query.java
@@ -62,20 +62,18 @@ public abstract class Query extends QRBase
    * @param id The query id.
    * @param type The type of the query.
    */
-  public Query(String id,
-               String type)
+  public Query(String id, String type)
   {
     super(id, type);
   }
+
   /**
    * Creates a query with the given id, type, and schemaKeys.
    * @param id The query id.
    * @param type The type of the query.
    * @param schemaKeys The schemaKeys for the query.
    */
-  public Query(String id,
-               String type,
-               Map<String, String> schemaKeys)
+  public Query(String id, String type, Map<String, String> schemaKeys)
   {
     super(id, type);
     setSchemaKeys(schemaKeys);
@@ -87,9 +85,7 @@ public abstract class Query extends QRBase
    * @param type The type of the query.
    * @param countdown The countdown for the query.
    */
-  public Query(String id,
-               String type,
-               long countdown)
+  public Query(String id, String type, long countdown)
   {
     super(id, type, countdown);
   }
@@ -101,10 +97,7 @@ public abstract class Query extends QRBase
    * @param countdown The countdown for the query.
    * @param schemaKeys The schemaKeys for the query.
    */
-  public Query(String id,
-               String type,
-               long countdown,
-               Map<String, String> schemaKeys)
+  public Query(String id, String type, long countdown, Map<String, String> schemaKeys)
   {
     super(id, type, countdown);
     setSchemaKeys(schemaKeys);
@@ -116,11 +109,11 @@ public abstract class Query extends QRBase
    */
   private void setSchemaKeys(Map<String, String> schemaKeys)
   {
-    if(schemaKeys == null) {
+    if (schemaKeys == null) {
       return;
     }
 
-    for(Map.Entry<String, String> entry: schemaKeys.entrySet()) {
+    for (Map.Entry<String, String> entry : schemaKeys.entrySet()) {
       Preconditions.checkNotNull(entry.getKey());
       Preconditions.checkNotNull(entry.getValue());
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
index 0841e13..ea08ce0 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Result.java
@@ -53,8 +53,7 @@ public abstract class Result extends QRBase
    * @param query The query that this result is a response to.
    * @param countdown The countdown for this result.
    */
-  public Result(Query query,
-                long countdown)
+  public Result(Query query, long countdown)
   {
     super(query.getId());
     setQuery(query);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
index 44a9c8c..1cd6712 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/ResultFormatter.java
@@ -114,34 +114,28 @@ public class ResultFormatter implements Serializable
   {
     Type type = Type.CLASS_TO_TYPE.get(object.getClass());
 
-    if(type == null) {
+    if (type == null) {
       return object.toString();
     }
 
-    switch(type) {
-      case FLOAT:
-      {
-        return format((float) ((Float) object));
+    switch (type) {
+      case FLOAT: {
+        return format((float)((Float)object));
       }
-      case DOUBLE:
-      {
-        return format((double) ((Double) object));
+      case DOUBLE: {
+        return format((double)((Double)object));
       }
-      case BYTE:
-      {
-        return format((byte) ((Byte) object));
+      case BYTE: {
+        return format((byte)((Byte)object));
       }
-      case SHORT:
-      {
-        return format((short) ((Short) object));
+      case SHORT: {
+        return format((short)((Short)object));
       }
-      case INTEGER:
-      {
-        return format((int) ((Integer) object));
+      case INTEGER: {
+        return format((int)((Integer)object));
       }
-      case LONG:
-      {
-        return format((long) ((Long) object));
+      case LONG: {
+        return format((long)((Long)object));
       }
       default:
         return object.toString();
@@ -157,7 +151,7 @@ public class ResultFormatter implements Serializable
   {
     DecimalFormat df = getFloatFormat();
 
-    if(df != null) {
+    if (df != null) {
       return df.format(val);
     }
 
@@ -173,7 +167,7 @@ public class ResultFormatter implements Serializable
   {
     DecimalFormat df = getDoubleFormat();
 
-    if(df != null) {
+    if (df != null) {
       return df.format(val);
     }
 
@@ -189,7 +183,7 @@ public class ResultFormatter implements Serializable
   {
     DecimalFormat df = getByteFormat();
 
-    if(df != null) {
+    if (df != null) {
       return df.format(val);
     }
 
@@ -205,7 +199,7 @@ public class ResultFormatter implements Serializable
   {
     DecimalFormat df = getShortFormat();
 
-    if(df != null) {
+    if (df != null) {
       return df.format(val);
     }
 
@@ -221,7 +215,7 @@ public class ResultFormatter implements Serializable
   {
     DecimalFormat df = getIntFormat();
 
-    if(df != null) {
+    if (df != null) {
       return df.format(val);
     }
 
@@ -237,7 +231,7 @@ public class ResultFormatter implements Serializable
   {
     DecimalFormat df = getLongFormat();
 
-    if(df != null) {
+    if (df != null) {
       return df.format(val);
     }
 
@@ -250,7 +244,7 @@ public class ResultFormatter implements Serializable
    */
   public DecimalFormat getFloatFormat()
   {
-    if(floatFormat == null && floatFormatString != null) {
+    if (floatFormat == null && floatFormatString != null) {
       floatFormat = new DecimalFormat(floatFormatString);
     }
 
@@ -263,7 +257,7 @@ public class ResultFormatter implements Serializable
    */
   public DecimalFormat getDoubleFormat()
   {
-    if(doubleFormat == null && doubleFormatString != null) {
+    if (doubleFormat == null && doubleFormatString != null) {
       doubleFormat = new DecimalFormat(doubleFormatString);
     }
 
@@ -276,7 +270,7 @@ public class ResultFormatter implements Serializable
    */
   public DecimalFormat getByteFormat()
   {
-    if(byteFormat == null && byteFormatString != null) {
+    if (byteFormat == null && byteFormatString != null) {
       byteFormat = new DecimalFormat(byteFormatString);
     }
 
@@ -289,7 +283,7 @@ public class ResultFormatter implements Serializable
    */
   public DecimalFormat getShortFormat()
   {
-    if(shortFormat == null && shortFormatString != null) {
+    if (shortFormat == null && shortFormatString != null) {
       shortFormat = new DecimalFormat(shortFormatString);
     }
 
@@ -302,7 +296,7 @@ public class ResultFormatter implements Serializable
    */
   public DecimalFormat getIntFormat()
   {
-    if(intFormat == null && intFormatString != null) {
+    if (intFormat == null && intFormatString != null) {
       intFormat = new DecimalFormat(intFormatString);
     }
 
@@ -315,7 +309,7 @@ public class ResultFormatter implements Serializable
    */
   public DecimalFormat getLongFormat()
   {
-    if(longFormat == null && longFormatString != null) {
+    if (longFormat == null && longFormatString != null) {
       longFormat = new DecimalFormat(longFormatString);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
index e2e21da..8260c81 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
@@ -41,26 +41,31 @@ public interface Schema
    * @return The id of the schema.
    */
   public int getSchemaID();
+
   /**
    * Gets the type of the schema (e.x. point, dimensions).
    * @return The type of the schema.
    */
   public String getSchemaType();
+
   /**
    * Gets the version of the schema.
    * @return The version of the schema.
    */
   public String getSchemaVersion();
+
   /**
    * Gets the AppData json to serve in response to a schema query.
    * @return The AppData json to serve in response to a schema query.
    */
   public String getSchemaJSON();
+
   /**
    * Gets the schema keys which are used to send queries targeted to this schema.
    * @return The schema keys which are used to send queries targeted to this schema.
    */
   public Map<String, String> getSchemaKeys();
+
   /**
    * Sets the schema keys for this schema.
    * @param schemaKeys The schema keys for this schema.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
index 8b9cfe0..a3fd69d 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaQuery.java
@@ -33,9 +33,9 @@ import com.datatorrent.lib.appdata.query.serde.SimpleDataValidator;
  * This class represents a schema query.
  * @since 3.0.0
  */
-@MessageType(type=SchemaQuery.TYPE)
-@MessageDeserializerInfo(clazz=SchemaQueryDeserializer.class)
-@MessageValidatorInfo(clazz=SimpleDataValidator.class)
+@MessageType(type = SchemaQuery.TYPE)
+@MessageDeserializerInfo(clazz = SchemaQueryDeserializer.class)
+@MessageValidatorInfo(clazz = SimpleDataValidator.class)
 public class SchemaQuery extends Query
 {
   public static final String FIELD_CONTEXT = "context";
@@ -70,15 +70,12 @@ public class SchemaQuery extends Query
    * @param id The id of the query.
    * @param schemaKeys The schema keys for the requested schema.
    */
-  public SchemaQuery(String id,
-                     Map<String, String> schemaKeys)
+  public SchemaQuery(String id, Map<String, String> schemaKeys)
   {
     super(id, TYPE, schemaKeys);
   }
 
-  public SchemaQuery(String id,
-                     Map<String, String> schemaKeys,
-                     Map<String, String> contextKeys)
+  public SchemaQuery(String id, Map<String, String> schemaKeys, Map<String, String> contextKeys)
   {
     super(id, TYPE);
     this.schemaKeys = schemaKeys;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
index 93e8bf2..374e16c 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistry.java
@@ -34,17 +34,20 @@ public interface SchemaRegistry
    * @return The schema result.
    */
   public SchemaResult getSchemaResult(SchemaQuery schemaQuery);
+
   /**
    * Registers the given schema with this schema registry.
    * @param schema The schema to register with this registry.
    */
   public void registerSchema(Schema schema);
+
   /**
    * Registers the given schema with the given schema keys.
    * @param schema The schema to register.
    * @param schemaKeys The schema keys that correspond with the given schema.
    */
   public void registerSchema(Schema schema, Map<String, String> schemaKeys);
+
   /**
    * Gets the schema corresponding to the given schema keys.
    * @param schemaKeys The schema keys for a schema.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
index ee63af8..eaabcbf 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.lib.appdata.schemas;
 
-import com.google.common.base.Preconditions;
 import java.io.Serializable;
-
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
+
 import com.datatorrent.lib.appdata.datastructs.DimensionalTable;
 
 /**
@@ -66,8 +66,7 @@ public class SchemaRegistryMultiple implements SchemaRegistry, Serializable
    * @param schemaComparator The comparator used to order the schemas returned in the {@link SchemaResult} produced
    * by {@link SchemaRegistryMultiple#getSchemaResult(com.datatorrent.lib.appdata.schemas.SchemaQuery)}
    */
-  public SchemaRegistryMultiple(List<String> schemaKeys,
-                                Comparator<Schema> schemaComparator)
+  public SchemaRegistryMultiple(List<String> schemaKeys, Comparator<Schema> schemaComparator)
   {
     this(schemaKeys);
     this.schemaComparator = Preconditions.checkNotNull(schemaComparator);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
index 8770fb2..12c5434 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistrySingle.java
@@ -19,14 +19,13 @@
 package com.datatorrent.lib.appdata.schemas;
 
 import java.io.Serializable;
-
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This schema registry holds a single schema. It is intended to be used in operators
  * which serve a single schema.
@@ -61,10 +60,8 @@ public class SchemaRegistrySingle implements SchemaRegistry, Serializable
   private void setSchema(Schema schema)
   {
     Preconditions.checkNotNull(schema);
-    Preconditions.checkArgument(schema.getSchemaKeys() == null,
-                                "The provided schema should not have schema keys "
-                                + schema.getSchemaKeys()
-                                + " since they will never be used.");
+    Preconditions.checkArgument(schema.getSchemaKeys() == null, "The provided schema should not have schema keys "
+        + schema.getSchemaKeys() + " since they will never be used.");
 
     this.schema = schema;
   }
@@ -74,8 +71,9 @@ public class SchemaRegistrySingle implements SchemaRegistry, Serializable
   {
     Preconditions.checkNotNull(schemaQuery, "This should never happen.");
 
-    if(schemaQuery.getSchemaKeys() != null) {
-      logger.error("Schema keys in the given query don't apply for single schema registry: schemaKeys={}", schemaQuery.getSchemaKeys());
+    if (schemaQuery.getSchemaKeys() != null) {
+      logger.error("Schema keys in the given query don't apply for single schema registry: schemaKeys={}",
+          schemaQuery.getSchemaKeys());
       return null;
     }
 
@@ -104,6 +102,6 @@ public class SchemaRegistrySingle implements SchemaRegistry, Serializable
   @Override
   public int size()
   {
-    return schema == null ? 0: 1;
+    return schema == null ? 0 : 1;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
index 616dce9..b7cc6bc 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResult.java
@@ -30,8 +30,8 @@ import com.datatorrent.lib.appdata.query.serde.MessageType;
  * as a result to a {@link SchemaQuery}.
  * @since 3.0.0
  */
-@MessageType(type=SchemaResult.TYPE)
-@MessageSerializerInfo(clazz=SchemaResultSerializer.class)
+@MessageType(type = SchemaResult.TYPE)
+@MessageSerializerInfo(clazz = SchemaResultSerializer.class)
 public class SchemaResult extends Result
 {
   /**
@@ -50,8 +50,7 @@ public class SchemaResult extends Result
    * @param schemaQuery
    * @param genericSchemas
    */
-  public SchemaResult(SchemaQuery schemaQuery,
-                      Schema... genericSchemas)
+  public SchemaResult(SchemaQuery schemaQuery, Schema... genericSchemas)
   {
     super(schemaQuery);
     setGenericSchemas(genericSchemas);
@@ -63,8 +62,7 @@ public class SchemaResult extends Result
    * @param schemaQuery The schema query which this schema result will be a response to.
    * @param genericSchemas The schemas to return in the schema result payload.
    */
-  public SchemaResult(SchemaQuery schemaQuery,
-                      List<Schema> genericSchemas)
+  public SchemaResult(SchemaQuery schemaQuery, List<Schema> genericSchemas)
   {
     super(schemaQuery);
     setGenericSchemas(genericSchemas);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
index 133affd..d9fd92c 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaResultSerializer.java
@@ -43,14 +43,11 @@ public class SchemaResultSerializer implements CustomMessageSerializer
   @Override
   public String serialize(Message message, ResultFormatter resultFormatter)
   {
-    if(!(message instanceof SchemaResult))
-    {
-      throw new IllegalArgumentException("Must receive a "
-                                         + SchemaResult.class
-                                         + " object.");
+    if (!(message instanceof SchemaResult)) {
+      throw new IllegalArgumentException("Must receive a " + SchemaResult.class + " object.");
     }
 
-    SchemaResult genericSchemaResult = (SchemaResult) message;
+    SchemaResult genericSchemaResult = (SchemaResult)message;
 
     StringBuilder sb = new StringBuilder();
 
@@ -58,20 +55,16 @@ public class SchemaResultSerializer implements CustomMessageSerializer
     logger.debug("result id {}", genericSchemaResult.getId());
     logger.debug("result type {}", genericSchemaResult.getType());
 
-    sb.append("{\"").append(Result.FIELD_ID).
-    append("\":\"").append(genericSchemaResult.getId()).
-    append("\",\"").append(Result.FIELD_TYPE).
-    append("\":\"").append(genericSchemaResult.getType()).
-    append("\",\"").append(Result.FIELD_DATA).
-    append("\":");
+    sb.append("{\"").append(Result.FIELD_ID).append("\":\"").append(genericSchemaResult.getId())
+        .append("\",\"").append(Result.FIELD_TYPE).append("\":\"").append(genericSchemaResult.getType())
+        .append("\",\"").append(Result.FIELD_DATA).append("\":");
 
     JSONArray schemaArray = new JSONArray();
 
-    for(Schema schema: genericSchemaResult.getGenericSchemas()) {
+    for (Schema schema : genericSchemaResult.getGenericSchemas()) {
       try {
         schemaArray.put(new JSONObject(schema.getSchemaJSON()));
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
index dcfbc8d..b628662 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
@@ -69,10 +69,8 @@ public class SchemaUtils
       InputStream is = SchemaUtils.class.getClassLoader().getResourceAsStream(resource);
       Preconditions.checkArgument(is != null, resource + " could not be found in the resources.");
 
-      IOUtils.copy(is,
-                   stringWriter);
-    }
-    catch(IOException ex) {
+      IOUtils.copy(is, stringWriter);
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
     return stringWriter.toString();
@@ -89,8 +87,7 @@ public class SchemaUtils
 
     try {
       IOUtils.copy(inputStream, stringWriter);
-    }
-    catch(IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -103,8 +100,7 @@ public class SchemaUtils
    * @param fields The keys in the {@link JSONObject} to check.
    * @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
    */
-  public static boolean checkValidKeys(JSONObject jo,
-                                       Fields fields)
+  public static boolean checkValidKeys(JSONObject jo, Fields fields)
   {
     @SuppressWarnings("unchecked")
     Set<String> fieldSet = fields.getFields();
@@ -119,8 +115,7 @@ public class SchemaUtils
    * @param jo The {@link JSONObject} to check.
    * @param fields The keys in the {@link JSONObject} to check.
    */
-  public static void checkValidKeysEx(JSONObject jo,
-                                      Fields fields)
+  public static void checkValidKeysEx(JSONObject jo, Fields fields)
   {
     @SuppressWarnings("unchecked")
     Set<String> fieldSet = fields.getFields();
@@ -128,10 +123,8 @@ public class SchemaUtils
 
     if (!jsonKeys.containsAll(fieldSet)) {
 
-      throw new IllegalArgumentException("The given set of keys "
-                                         + fieldSet
-                                         + " doesn't equal the set of keys in the json "
-                                         + jsonKeys);
+      throw new IllegalArgumentException("The given set of keys " + fieldSet
+          + " doesn't equal the set of keys in the json " + jsonKeys);
     }
   }
 
@@ -141,15 +134,12 @@ public class SchemaUtils
    * @param fieldsCollection The keys in the {@link JSONObject} to check.
    * @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
    */
-  public static boolean checkValidKeys(JSONObject jo,
-                                       Collection<Fields> fieldsCollection)
+  public static boolean checkValidKeys(JSONObject jo, Collection<Fields> fieldsCollection)
   {
-    return checkValidKeysHelper(jo,
-                                fieldsCollection);
+    return checkValidKeysHelper(jo, fieldsCollection);
   }
 
-  private static boolean checkValidKeysHelper(JSONObject jo,
-                                              Collection<Fields> fieldsCollection)
+  private static boolean checkValidKeysHelper(JSONObject jo, Collection<Fields> fieldsCollection)
   {
     for (Fields fields: fieldsCollection) {
       LOG.debug("Checking keys: {}", fields);
@@ -158,17 +148,14 @@ public class SchemaUtils
       }
     }
 
-    LOG.error("The first level of keys in the provided JSON {} do not match any of the " +
-              "valid keysets {}",
-              getSetOfJSONKeys(jo),
-              fieldsCollection);
+    LOG.error("The first level of keys in the provided JSON {} do not match any of the " + "valid keysets {}",
+        getSetOfJSONKeys(jo), fieldsCollection);
     return false;
   }
 
   public static boolean checkValidKeys(JSONObject jo, List<Fields> fieldsCollection)
   {
-    return checkValidKeysHelper(jo,
-                                fieldsCollection);
+    return checkValidKeysHelper(jo, fieldsCollection);
   }
 
   /**
@@ -178,15 +165,12 @@ public class SchemaUtils
    * @param fieldsCollection The keys in the {@link JSONObject} to check.
    * @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
    */
-  public static boolean checkValidKeysEx(JSONObject jo,
-                                         Collection<Fields> fieldsCollection)
+  public static boolean checkValidKeysEx(JSONObject jo, Collection<Fields> fieldsCollection)
   {
-    return checkValidKeysExHelper(jo,
-                                  fieldsCollection);
+    return checkValidKeysExHelper(jo, fieldsCollection);
   }
 
-  public static boolean checkValidKeysExHelper(JSONObject jo,
-                                               Collection<Fields> fieldsCollection)
+  public static boolean checkValidKeysExHelper(JSONObject jo, Collection<Fields> fieldsCollection)
   {
     for (Fields fields: fieldsCollection) {
       if (checkValidKeys(jo, fields)) {
@@ -196,10 +180,8 @@ public class SchemaUtils
 
     Set<String> keys = getSetOfJSONKeys(jo);
 
-    throw new IllegalArgumentException("The given json object has an invalid set of keys: " +
-                                       keys +
-                                       "\nOne of the following key combinations was expected:\n" +
-                                       fieldsCollection);
+    throw new IllegalArgumentException("The given json object has an invalid set of keys: " + keys
+        + "\nOne of the following key combinations was expected:\n" + fieldsCollection);
   }
 
   public static boolean checkValidKeysEx(JSONObject jo, List<Fields> fieldsCollection)
@@ -225,7 +207,7 @@ public class SchemaUtils
   {
     Map<String, String> fieldToTypeString = Maps.newHashMap();
 
-    for(Map.Entry<String, Type> entry: fieldToType.entrySet()) {
+    for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
       String field = entry.getKey();
       String typeString = entry.getValue().name();
 
@@ -237,12 +219,11 @@ public class SchemaUtils
 
   public static JSONArray findFirstKeyJSONArray(JSONObject jo, String key)
   {
-    if(jo.has(key)) {
+    if (jo.has(key)) {
       try {
         JSONArray jsonArray = jo.getJSONArray(key);
         return jsonArray;
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
     }
@@ -250,22 +231,21 @@ public class SchemaUtils
     @SuppressWarnings("unchecked")
     Iterator<String> keyIterator = jo.keys();
 
-    while(keyIterator.hasNext()) {
+    while (keyIterator.hasNext()) {
       String childKey = keyIterator.next();
 
       JSONArray childJa = null;
 
       try {
         childJa = jo.getJSONArray(childKey);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJa != null) {
+      if (childJa != null) {
         JSONArray result = findFirstKeyJSONArray(childJa, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
 
@@ -276,15 +256,14 @@ public class SchemaUtils
 
       try {
         childJo = jo.getJSONObject(childKey);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJo != null) {
+      if (childJo != null) {
         JSONArray result = findFirstKeyJSONArray(childJo, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
       }
@@ -295,22 +274,19 @@ public class SchemaUtils
 
   public static JSONArray findFirstKeyJSONArray(JSONArray ja, String key)
   {
-    for(int index = 0;
-        index < ja.length();
-        index++) {
+    for (int index = 0; index < ja.length(); index++) {
       JSONArray childJa = null;
 
       try {
         childJa = ja.getJSONArray(index);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJa != null) {
+      if (childJa != null) {
         JSONArray result = findFirstKeyJSONArray(childJa, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
 
@@ -321,15 +297,14 @@ public class SchemaUtils
 
       try {
         childJo = ja.getJSONObject(index);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJo != null) {
+      if (childJo != null) {
         JSONArray result = findFirstKeyJSONArray(childJo, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
       }
@@ -340,12 +315,11 @@ public class SchemaUtils
 
   public static JSONObject findFirstKeyJSONObject(JSONObject jo, String key)
   {
-    if(jo.has(key)) {
+    if (jo.has(key)) {
       try {
         JSONObject jsonObject = jo.getJSONObject(key);
         return jsonObject;
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
     }
@@ -353,22 +327,21 @@ public class SchemaUtils
     @SuppressWarnings("unchecked")
     Iterator<String> keyIterator = jo.keys();
 
-    while(keyIterator.hasNext()) {
+    while (keyIterator.hasNext()) {
       String childKey = keyIterator.next();
 
       JSONArray childJa = null;
 
       try {
         childJa = jo.getJSONArray(childKey);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJa != null) {
+      if (childJa != null) {
         JSONObject result = findFirstKeyJSONObject(childJa, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
 
@@ -379,15 +352,14 @@ public class SchemaUtils
 
       try {
         childJo = jo.getJSONObject(childKey);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJo != null) {
+      if (childJo != null) {
         JSONObject result = findFirstKeyJSONObject(childJo, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
       }
@@ -398,22 +370,19 @@ public class SchemaUtils
 
   public static JSONObject findFirstKeyJSONObject(JSONArray ja, String key)
   {
-    for(int index = 0;
-        index < ja.length();
-        index++) {
+    for (int index = 0; index < ja.length(); index++) {
       JSONArray childJa = null;
 
       try {
         childJa = ja.getJSONArray(index);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJa != null) {
+      if (childJa != null) {
         JSONObject result = findFirstKeyJSONObject(childJa, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
 
@@ -424,15 +393,14 @@ public class SchemaUtils
 
       try {
         childJo = ja.getJSONObject(index);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         //Do nothing
       }
 
-      if(childJo != null) {
+      if (childJo != null) {
         JSONObject result = findFirstKeyJSONObject(childJo, key);
 
-        if(result != null) {
+        if (result != null) {
           return result;
         }
       }
@@ -452,14 +420,13 @@ public class SchemaUtils
     @SuppressWarnings("unchecked")
     Iterator<String> keyIterator = jo.keys();
 
-    while(keyIterator.hasNext()) {
+    while (keyIterator.hasNext()) {
       String key = keyIterator.next();
       String value;
 
       try {
         value = jo.getString(key);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
 
@@ -478,14 +445,13 @@ public class SchemaUtils
   {
     JSONObject jo = new JSONObject();
 
-    for(Map.Entry<String, String> entry: map.entrySet()) {
+    for (Map.Entry<String, String> entry : map.entrySet()) {
       String key = entry.getKey();
       String value = entry.getValue();
 
       try {
         jo.put(key, value);
-      }
-      catch(JSONException ex) {
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
index aed9013..5010580 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
@@ -22,13 +22,13 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -118,8 +118,7 @@ public class SnapshotSchema implements Schema
    * @param schemaJSON The JSON defining this schema.
    * @param schemaKeys The schema keys tied to this schema.
    */
-  public SnapshotSchema(String schemaJSON,
-                       Map<String, String> schemaKeys)
+  public SnapshotSchema(String schemaJSON, Map<String, String> schemaKeys)
   {
     this(schemaJSON);
 
@@ -133,12 +132,9 @@ public class SnapshotSchema implements Schema
    * @param schemaJSON The schemaJSON this schema is built from.
    * @param schemaKeys The schemaKeys associated with this schema.
    */
-  public SnapshotSchema(int schemaID,
-                       String schemaJSON,
-                       Map<String, String> schemaKeys)
+  public SnapshotSchema(int schemaID, String schemaJSON, Map<String, String> schemaKeys)
   {
-    this(schemaJSON,
-         schemaKeys);
+    this(schemaJSON, schemaKeys);
 
     this.schemaID = schemaID;
   }
@@ -153,8 +149,7 @@ public class SnapshotSchema implements Schema
 
     try {
       initialize();
-    }
-    catch(Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
   }
@@ -165,8 +160,7 @@ public class SnapshotSchema implements Schema
    * @param schemaID The schemaID associated with this schema.
    * @param schemaJSON The JSON that this schema is constructed from.
    */
-  public SnapshotSchema(int schemaID,
-                       String schemaJSON)
+  public SnapshotSchema(int schemaID, String schemaJSON)
   {
     this(schemaJSON);
     this.schemaID = schemaID;
@@ -177,12 +171,12 @@ public class SnapshotSchema implements Schema
   {
     changed = true;
 
-    if(schemaKeys == null) {
+    if (schemaKeys == null) {
       this.schemaKeys = null;
       return;
     }
 
-    for(Map.Entry<String, String> entry: schemaKeys.entrySet()) {
+    for (Map.Entry<String, String> entry : schemaKeys.entrySet()) {
       Preconditions.checkNotNull(entry.getKey());
       Preconditions.checkNotNull(entry.getValue());
     }
@@ -199,9 +193,8 @@ public class SnapshotSchema implements Schema
   {
     schema = new JSONObject(schemaJSON);
 
-    if(schemaKeys != null) {
-      schema.put(Schema.FIELD_SCHEMA_KEYS,
-                 SchemaUtils.createJSONObject(schemaKeys));
+    if (schemaKeys != null) {
+      schema.put(Schema.FIELD_SCHEMA_KEYS, SchemaUtils.createJSONObject(schemaKeys));
     }
 
     valueToType = Maps.newHashMap();
@@ -209,12 +202,9 @@ public class SnapshotSchema implements Schema
     JSONArray values = schema.getJSONArray(FIELD_VALUES);
 
     Preconditions.checkState(values.length() > 0,
-                             "The schema does not specify any values.");
+        "The schema does not specify any values.");
 
-    for(int index = 0;
-        index < values.length();
-        index++)
-    {
+    for (int index = 0; index < values.length(); index++) {
       JSONObject value = values.getJSONObject(index);
       String name = value.getString(FIELD_VALUES_NAME);
       String typeName = value.getString(FIELD_VALUES_TYPE);
@@ -222,9 +212,7 @@ public class SnapshotSchema implements Schema
       Type type = Type.NAME_TO_TYPE.get(typeName);
       valueToType.put(name, type);
 
-      Preconditions.checkArgument(type != null,
-                                  typeName
-                                  + " is not a valid type.");
+      Preconditions.checkArgument(type != null, typeName + " is not a valid type.");
     }
 
     valueToType = Collections.unmodifiableMap(valueToType);
@@ -233,8 +221,7 @@ public class SnapshotSchema implements Schema
     try {
       schema.put(FIELD_SCHEMA_TYPE, SCHEMA_TYPE);
       schema.put(FIELD_SCHEMA_VERSION, SCHEMA_VERSION);
-    }
-    catch(JSONException e) {
+    } catch (JSONException e) {
       throw new RuntimeException(e);
     }
 
@@ -243,8 +230,9 @@ public class SnapshotSchema implements Schema
 
   public void setTags(Set<String> tags)
   {
-    if (tags == null || tags.isEmpty())
+    if (tags == null || tags.isEmpty()) {
       throw new IllegalArgumentException("tags can't be null or empty.");
+    }
 
     try {
       JSONArray tagArray = new JSONArray(tags);
@@ -270,19 +258,16 @@ public class SnapshotSchema implements Schema
   @Override
   public String getSchemaJSON()
   {
-    if(!changed && schemaJSON != null) {
+    if (!changed && schemaJSON != null) {
       return schemaJSON;
     }
 
-    if(schemaKeys == null) {
+    if (schemaKeys == null) {
       schema.remove(Schema.FIELD_SCHEMA_KEYS);
-    }
-    else {
+    } else {
       try {
-        schema.put(Schema.FIELD_SCHEMA_KEYS,
-                        SchemaUtils.createJSONObject(schemaKeys));
-      }
-      catch(JSONException ex) {
+        schema.put(Schema.FIELD_SCHEMA_KEYS, SchemaUtils.createJSONObject(schemaKeys));
+      } catch (JSONException ex) {
         throw new RuntimeException(ex);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
index f1f24bc..5570ca0 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/TimeBucket.java
@@ -91,13 +91,11 @@ public enum TimeBucket
    */
   public static final Map<TimeUnit, TimeBucket> TIME_UNIT_TO_TIME_BUCKET;
 
-  static
-  {
+  static {
     Map<String, TimeBucket> bucketToType = Maps.newHashMap();
     Map<TimeUnit, TimeBucket> timeUnitToTimeBucket = Maps.newHashMap();
 
-    for(TimeBucket timeBucket: TimeBucket.values())
-    {
+    for (TimeBucket timeBucket : TimeBucket.values()) {
       timeUnitToTimeBucket.put(timeBucket.getTimeUnit(), timeBucket);
       bucketToType.put(timeBucket.getText(), timeBucket);
     }
@@ -188,7 +186,7 @@ public enum TimeBucket
    */
   public long roundDown(long timestamp)
   {
-    if(timeUnit == null) {
+    if (timeUnit == null) {
       return 0;
     }
 
@@ -218,8 +216,7 @@ public enum TimeBucket
   public static TimeBucket getBucketEx(String name)
   {
     TimeBucket bucket = getBucket(name);
-    Preconditions.checkArgument(bucket != null,
-                                name + " is not a valid bucket type.");
+    Preconditions.checkArgument(bucket != null, name + " is not a valid bucket type.");
     return bucket;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
index 1748e17..5e79466 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Type.java
@@ -106,7 +106,7 @@ public enum Type implements Serializable
   static {
     Map<String, Type> nameToType = Maps.newHashMap();
 
-    for(Type type: Type.values()) {
+    for (Type type : Type.values()) {
       nameToType.put(type.getName(), type);
     }
 
@@ -114,7 +114,7 @@ public enum Type implements Serializable
 
     Map<Class<?>, Type> clazzToType = Maps.newHashMap();
 
-    for(Type type: Type.values()) {
+    for (Type type : Type.values()) {
       clazzToType.put(type.getClazz(), type);
     }
 
@@ -153,11 +153,7 @@ public enum Type implements Serializable
    * @param clazz The Class of the corresponding Java type.
    * @param higherTypes The set of types to which this type can be promoted.
    */
-  Type(String name,
-       int byteSize,
-       JSONType jsonType,
-       Class<?> clazz,
-       Set<Type> higherTypes)
+  Type(String name, int byteSize, JSONType jsonType, Class<?> clazz, Set<Type> higherTypes)
   {
     this.name = name;
     this.byteSize = byteSize;
@@ -247,8 +243,7 @@ public enum Type implements Serializable
   {
     Type type = getType(name);
 
-    Preconditions.checkArgument(type != null,
-                                name + " is not a valid type.");
+    Preconditions.checkArgument(type != null, name + " is not a valid type.");
 
     return type;
   }
@@ -262,50 +257,41 @@ public enum Type implements Serializable
    */
   public static Object promote(Type from, Type to, Object o)
   {
-    if(from == to) {
+    if (from == to) {
       return o;
     }
 
-    Preconditions.checkArgument(!(from == Type.BOOLEAN
-                                  || from == Type.CHAR
-                                  || from == Type.LONG
-                                  || from == Type.DOUBLE),
-                                "Cannot convert "
-                                + Type.BOOLEAN.getName() + ", "
-                                + Type.CHAR.getName() + ", "
-                                + Type.LONG.getName() + ", or "
-                                + Type.DOUBLE + " to a larger type.");
+    Preconditions.checkArgument(!(from == Type.BOOLEAN || from == Type.CHAR || from == Type.LONG
+        || from == Type.DOUBLE), "Cannot convert " + Type.BOOLEAN.getName() + ", " + Type.CHAR.getName()
+        + ", " + Type.LONG.getName() + ", or " + Type.DOUBLE + " to a larger type.");
 
     Preconditions.checkArgument(from.getHigherTypes().contains(to),
-                                from.getName() + " cannot be promoted to " + to.getName());
+        from.getName() + " cannot be promoted to " + to.getName());
 
-    if(from == Type.FLOAT && to == Type.DOUBLE) {
+    if (from == Type.FLOAT && to == Type.DOUBLE) {
       return (Double)((Float)o).doubleValue();
     }
 
-    if(from == Type.BYTE) {
-      if(to == Type.SHORT) {
+    if (from == Type.BYTE) {
+      if (to == Type.SHORT) {
         return (Short)((Byte)o).shortValue();
-      }
-      else if(to == Type.INTEGER) {
+      } else if (to == Type.INTEGER) {
         return (Integer)((Byte)o).intValue();
-      }
-      else if(to == Type.LONG) {
+      } else if (to == Type.LONG) {
         return (Long)((Byte)o).longValue();
       }
     }
 
-    if(from == Type.SHORT) {
-      if(to == Type.INTEGER) {
+    if (from == Type.SHORT) {
+      if (to == Type.INTEGER) {
         return (Integer)((Short)o).intValue();
-      }
-      else if(to == Type.LONG) {
+      } else if (to == Type.LONG) {
         return (Long)((Short)o).longValue();
       }
     }
 
-    if(from == Type.INTEGER
-       && to == Type.LONG) {
+    if (from == Type.INTEGER
+        && to == Type.LONG) {
       return (Long)((Integer)o).longValue();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index 236735f..0b03e79 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -111,8 +111,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   private Set<String> tags;
   
   @AppData.QueryPort
-  @InputPortFieldAnnotation(optional=true)
-  public transient final DefaultInputPort<String> query = new DefaultInputPort<String>()
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<String> query = new DefaultInputPort<String>()
   {
     @Override
     public void process(String queryJSON)
@@ -151,7 +151,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     }
   }
 
-  public transient final DefaultInputPort<List<INPUT_EVENT>> input = new DefaultInputPort<List<INPUT_EVENT>>()
+  public final transient DefaultInputPort<List<INPUT_EVENT>> input = new DefaultInputPort<List<INPUT_EVENT>>()
   {
     @Override
     public void process(List<INPUT_EVENT> rows)
@@ -187,7 +187,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
 
 
   @Override
-  final public void activate(OperatorContext ctx)
+  public final void activate(OperatorContext ctx)
   {
     if (embeddableQueryInfoProvider != null) {
       embeddableQueryInfoProvider.activate(ctx);
@@ -213,8 +213,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     if (embeddableQueryInfoProvider != null) {
       embeddableQueryInfoProvider.enableEmbeddedMode();
       LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName());
-      StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(),
-                                             query);
+      StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(), query);
       embeddableQueryInfoProvider.setup(context);
     }
   }
@@ -222,8 +221,9 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   protected void setupSchema()
   {
     schema = new SnapshotSchema(snapshotSchemaJSON);
-    if (tags != null && !tags.isEmpty())
+    if (tags != null && !tags.isEmpty()) {
       schema.setTags(tags);
+    }
   }
 
   protected void setupQueryProcessor()
@@ -252,7 +252,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     {
       Result result;
 
-      while((result = queryProcessor.process()) != null) {
+      while ((result = queryProcessor.process()) != null) {
         String resultJSON = resultSerializerFactory.serialize(result);
         LOG.debug("emitting {}", resultJSON);
         queryResult.emit(resultJSON);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
index 8134ff9..0fc4200 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMap.java
@@ -23,13 +23,14 @@ import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 import com.datatorrent.lib.appdata.gpo.GPOMutable;
 import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This operator accepts a list of Map&lt;String,Object&gt; objects, and serves the data under the {@link SnapshotSchema}.
@@ -59,9 +60,7 @@ public class AppDataSnapshotServerMap extends AbstractAppDataSnapshotServer<Map<
 
     List<String> fields = fd.getFieldList();
 
-    for(int index = 0;
-        index < fields.size();
-        index++) {
+    for (int index = 0; index < fields.size(); index++) {
       String field = fields.get(index);
       values.setFieldGeneric(field, inputEvent.get(getMapField(field)));
     }
@@ -77,13 +76,13 @@ public class AppDataSnapshotServerMap extends AbstractAppDataSnapshotServer<Map<
    */
   private String getMapField(String field)
   {
-    if(tableFieldToMapField == null) {
+    if (tableFieldToMapField == null) {
       return field;
     }
 
     String mapField = tableFieldToMapField.get(field);
 
-    if(mapField == null) {
+    if (mapField == null) {
       return field;
     }
 
@@ -111,7 +110,7 @@ public class AppDataSnapshotServerMap extends AbstractAppDataSnapshotServer<Map<
   {
     Preconditions.checkNotNull(tableFieldToMapField);
 
-    for(Map.Entry<String, String> entry: tableFieldToMapField.entrySet()) {
+    for (Map.Entry<String, String> entry : tableFieldToMapField.entrySet()) {
       Preconditions.checkNotNull(entry.getKey());
       Preconditions.checkNotNull(entry.getValue());
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
index 68d9017..ab3c760 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerPOJO.java
@@ -77,7 +77,7 @@ public class AppDataSnapshotServerPOJO extends AbstractAppDataSnapshotServer<Obj
    */
   private void firstTuple(Object inputEvent)
   {
-    if(firstTupleProcessed) {
+    if (firstTupleProcessed) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java b/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
index b82fc14..1096a72 100644
--- a/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
+++ b/library/src/main/java/com/datatorrent/lib/codec/JavaSerializationStreamCodec.java
@@ -41,37 +41,37 @@ import com.datatorrent.netlet.util.Slice;
  */
 public class JavaSerializationStreamCodec<T extends Serializable> implements StreamCodec<T>, Serializable
 {
-	@Override
-	public Object fromByteArray(Slice fragment)
-	{
-		ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer,
-				fragment.offset, fragment.length);
-		try {
-			ObjectInputStream ois = new ObjectInputStream(bis);
-			return ois.readObject();
-		} catch (Exception ioe) {
-			throw new RuntimeException(ioe);
-		}
-	}
+  @Override
+  public Object fromByteArray(Slice fragment)
+  {
+    ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer,
+        fragment.offset, fragment.length);
+    try {
+      ObjectInputStream ois = new ObjectInputStream(bis);
+      return ois.readObject();
+    } catch (Exception ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
 
-	@Override
-	public Slice toByteArray(T object)
-	{
-		ByteArrayOutputStream bos = new ByteArrayOutputStream();
-		try {
-			ObjectOutputStream oos = new ObjectOutputStream(bos);
-			oos.writeObject(object);
-			oos.flush();
-			byte[] buffer = bos.toByteArray();
-			return new Slice(buffer, 0, buffer.length);
-		} catch (IOException ex) {
-			throw new RuntimeException(ex);
-		}
-	}
+  @Override
+  public Slice toByteArray(T object)
+  {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(bos);
+      oos.writeObject(object);
+      oos.flush();
+      byte[] buffer = bos.toByteArray();
+      return new Slice(buffer, 0, buffer.length);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 
-	@Override
-	public int getPartition(T o)
-	{
-		return o.hashCode();
-	}
+  @Override
+  public int getPartition(T o)
+  {
+    return o.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
index 7342cfd..33e5bf6 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
@@ -18,10 +18,11 @@
  */
 package com.datatorrent.lib.converter;
 
-import com.datatorrent.common.util.BaseOperator;
+import java.nio.charset.Charset;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import java.nio.charset.Charset;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator converts Byte Array to String. User gets the option of providing character Encoding.
@@ -55,7 +56,7 @@ public class ByteArrayToStringConverterOperator extends BaseOperator
     @Override
     public void process(byte[] message)
     {
-      output.emit(characterEncoding == null? new String(message): new String(message, characterEncoding));
+      output.emit(characterEncoding == null ? new String(message) : new String(message, characterEncoding));
     }
 
   };

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/Converter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/Converter.java b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
index f1d4325..ef999e4 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/Converter.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
@@ -36,8 +36,7 @@ public interface Converter<INPUT, OUTPUT>
    * Provide the implementation for converting tuples from one format to the
    * other
    * 
-   * @param INPUT
-   *          tuple of certain format
+   * @param tuple tuple of certain format
    * @return OUTPUT tuple of converted format
    */
   public OUTPUT convert(INPUT tuple);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
index bb09cd8..52d3273 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyHashValuePairConverter.java
@@ -21,9 +21,9 @@ package com.datatorrent.lib.converter;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.util.KeyHashValPair;
 
 /**
@@ -36,7 +36,8 @@ import com.datatorrent.lib.util.KeyHashValPair;
  *
  * @since 3.0.0
  */
-public class MapToKeyHashValuePairConverter<K, V> extends BaseOperator {
+public class MapToKeyHashValuePairConverter<K, V> extends BaseOperator
+{
 
   /**
    * Input port which accepts Map<K, V>.
@@ -46,8 +47,7 @@ public class MapToKeyHashValuePairConverter<K, V> extends BaseOperator {
     @Override
     public void process(Map<K, V> tuple)
     {
-      for(Entry<K, V> entry:tuple.entrySet())
-      {
+      for (Entry<K, V> entry : tuple.entrySet()) {
         output.emit(new KeyHashValPair<K, V>(entry.getKey(), entry.getValue()));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
index f7f63ed..efab4a5 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/MapToKeyValuePairConverter.java
@@ -21,11 +21,10 @@ package com.datatorrent.lib.converter;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  *
@@ -37,7 +36,8 @@ import com.datatorrent.api.DefaultOutputPort;
  *
  * @since 3.0.0
  */
-public class MapToKeyValuePairConverter<K, V> extends BaseOperator {
+public class MapToKeyValuePairConverter<K, V> extends BaseOperator
+{
 
   /**
    * Input port which accepts Map<K, V>.
@@ -47,8 +47,7 @@ public class MapToKeyValuePairConverter<K, V> extends BaseOperator {
     @Override
     public void process(Map<K, V> tuple)
     {
-      for(Entry<K, V> entry:tuple.entrySet())
-      {
+      for (Entry<K, V> entry : tuple.entrySet()) {
         output.emit(new KeyValPair<K, V>(entry.getKey(), entry.getValue()));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java b/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
index ad93868..e39259e 100644
--- a/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
+++ b/library/src/main/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMap.java
@@ -22,9 +22,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  *
@@ -36,7 +36,8 @@ import com.datatorrent.api.DefaultOutputPort;
  *
  * @since 3.0.0
  */
-public class StringValueToNumberConverterForMap<K> extends BaseOperator {
+public class StringValueToNumberConverterForMap<K> extends BaseOperator
+{
 
   /**
    * Input port which accepts Map<K, Numeric String>.
@@ -47,8 +48,7 @@ public class StringValueToNumberConverterForMap<K> extends BaseOperator {
     public void process(Map<K, String> tuple)
     {
       Map<K, Number> outputMap = new HashMap<K, Number>();
-      for(Entry<K, String> entry:tuple.entrySet())
-      {
+      for (Entry<K, String> entry : tuple.entrySet()) {
         String val = entry.getValue();
         if (val == null) {
           return;
@@ -57,12 +57,10 @@ public class StringValueToNumberConverterForMap<K> extends BaseOperator {
         boolean errortuple = false;
         try {
           tvalue = Double.parseDouble(val);
-        }
-        catch (NumberFormatException e) {
+        } catch (NumberFormatException e) {
           errortuple = true;
         }
-        if(!errortuple)
-        {
+        if (!errortuple) {
           outputMap.put(entry.getKey(), tvalue);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java b/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
index 60e2d8e..255ec03 100644
--- a/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
+++ b/library/src/main/java/com/datatorrent/lib/counters/BasicCounters.java
@@ -25,9 +25,6 @@ import java.util.Map;
 
 import javax.annotation.Nonnull;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.map.JsonSerializer;
 import org.codehaus.jackson.map.SerializerProvider;
@@ -35,8 +32,10 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 
 import org.apache.commons.lang.mutable.Mutable;
 
-import com.datatorrent.api.Context;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.common.util.NumberAggregate;
 
 /**
@@ -142,7 +141,7 @@ public class BasicCounters<T extends Number & Mutable> implements Serializable
       for (Object counter : objects) {
         if (counter instanceof BasicCounters) {
           @SuppressWarnings("unchecked")
-          BasicCounters<T> physical = (BasicCounters<T>) counter;
+          BasicCounters<T> physical = (BasicCounters<T>)counter;
           ImmutableMap<Enum<?>, T> copy = physical.getCopy();
 
           for (Map.Entry<Enum<?>, T> entry : copy.entrySet()) {
@@ -178,7 +177,7 @@ public class BasicCounters<T extends Number & Mutable> implements Serializable
       for (Object counter : objects) {
         if (counter instanceof BasicCounters) {
           @SuppressWarnings("unchecked")
-          BasicCounters<T> physical = (BasicCounters<T>) counter;
+          BasicCounters<T> physical = (BasicCounters<T>)counter;
           ImmutableMap<Enum<?>, T> copy = physical.getCopy();
 
           for (Map.Entry<Enum<?>, T> entry : copy.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
index ba89f7b..5225329 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractAggregateTransactionableKeyValueStoreOutputOperator.java
@@ -34,6 +34,6 @@ package com.datatorrent.lib.db;
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public abstract class AbstractAggregateTransactionableKeyValueStoreOutputOperator<T, S extends TransactionableKeyValueStore>
-        extends AbstractAggregateTransactionableStoreOutputOperator<T, S>
+    extends AbstractAggregateTransactionableStoreOutputOperator<T, S>
 {
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
index 771e679..fbb924a 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
@@ -35,10 +35,13 @@ import com.google.common.collect.Lists;
  * @since 1.0.2
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractBatchTransactionableStoreOutputOperator<T, S extends TransactionableStore> extends AbstractAggregateTransactionableStoreOutputOperator<T, S> {
+public abstract class AbstractBatchTransactionableStoreOutputOperator<T, S extends TransactionableStore>
+    extends AbstractAggregateTransactionableStoreOutputOperator<T, S>
+{
 
   private Collection<T> tuples;
-  public AbstractBatchTransactionableStoreOutputOperator(){
+  public AbstractBatchTransactionableStoreOutputOperator()
+  {
     tuples = Lists.newArrayList();
   }
 
@@ -62,7 +65,8 @@ public abstract class AbstractBatchTransactionableStoreOutputOperator<T, S exten
   public abstract void processBatch(Collection<T> tuples);
 
   @Override
-  public void storeAggregate() {
+  public void storeAggregate()
+  {
     processBatch(tuples);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
index 9b6ddc2..f3f8c63 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractKeyValueStoreInputOperator.java
@@ -18,7 +18,11 @@
  */
 package com.datatorrent.lib.db;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /**
  * This is the base implementation of an input operator which consumes data from a key value store.&nbsp;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
index f199986..fe828dc 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableKeyValueStoreOutputOperator.java
@@ -36,6 +36,6 @@ package com.datatorrent.lib.db;
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public abstract class AbstractPassThruTransactionableKeyValueStoreOutputOperator<T, S extends TransactionableKeyValueStore>
-        extends AbstractPassThruTransactionableStoreOutputOperator<T, S>
+    extends AbstractPassThruTransactionableStoreOutputOperator<T, S>
 {
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
index 55c8617..b471a63 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
@@ -35,7 +35,8 @@ package com.datatorrent.lib.db;
  * @since 0.9.3
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public abstract class AbstractPassThruTransactionableStoreOutputOperator<T, S extends TransactionableStore> extends AbstractTransactionableStoreOutputOperator<T, S>
+public abstract class AbstractPassThruTransactionableStoreOutputOperator<T, S extends TransactionableStore>
+    extends AbstractTransactionableStoreOutputOperator<T, S>
 {
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
index a6eb780..18e23f4 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreInputOperator.java
@@ -41,7 +41,7 @@ public abstract class AbstractStoreInputOperator<T, S extends Connectable> imple
   /**
    * The output port on which tuples read form a store are emitted.
    */
-  final public transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
+  public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
   protected S store;
   /**
    * Gets the store.
@@ -79,8 +79,7 @@ public abstract class AbstractStoreInputOperator<T, S extends Connectable> imple
   {
     try {
       store.connect();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -90,8 +89,7 @@ public abstract class AbstractStoreInputOperator<T, S extends Connectable> imple
   {
     try {
       store.disconnect();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       // ignore
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
index aff5b3e..2ef8d30 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractStoreOutputOperator.java
@@ -20,10 +20,10 @@ package com.datatorrent.lib.db;
 
 import java.io.IOException;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This is the base implementation of an output operator,
@@ -80,8 +80,7 @@ public abstract class AbstractStoreOutputOperator<T, S extends Connectable> exte
   {
     try {
       store.connect();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -96,8 +95,7 @@ public abstract class AbstractStoreOutputOperator<T, S extends Connectable> exte
   {
     try {
       store.disconnect();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
index 61682c2..037a8b2 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
@@ -20,11 +20,11 @@ package com.datatorrent.lib.db;
 
 import java.io.IOException;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This is the base implementation of an output operator,
@@ -95,8 +95,7 @@ public abstract class AbstractTransactionableStoreOutputOperator<T, S extends Tr
       appId = context.getValue(DAG.APPLICATION_ID);
       operatorId = context.getId();
       committedWindowId = store.getCommittedWindowId(appId, operatorId);
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -115,8 +114,7 @@ public abstract class AbstractTransactionableStoreOutputOperator<T, S extends Tr
         store.rollbackTransaction();
       }
       store.disconnect();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
index 03f7719..7dee870 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcNonTransactionableBatchOutputOperator.java
@@ -18,15 +18,19 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.google.common.collect.Lists;
 import java.sql.SQLException;
 import java.util.List;
+
 import javax.validation.constraints.Min;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+
 /**
  * A generic output operator which updates the database without using transactions
  * and batches writes to increase performance. This operator satisfies the exactly once constraint
@@ -38,7 +42,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S extends JdbcNonTransactionalStore> extends AbstractJdbcNonTransactionableOutputOperator<T, S>
 {
-  private static transient final Logger LOG = LoggerFactory.getLogger(AbstractJdbcNonTransactionableBatchOutputOperator.class);
+  private static final transient Logger LOG = LoggerFactory.getLogger(AbstractJdbcNonTransactionableBatchOutputOperator.class);
   public static final int DEFAULT_BATCH_SIZE = 1000;
 
   @Min(1)
@@ -93,7 +97,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
 
     mode = context.getValue(OperatorContext.PROCESSING_MODE);
 
-    if(mode==ProcessingMode.AT_MOST_ONCE){
+    if (mode == ProcessingMode.AT_MOST_ONCE) {
       //Batch must be cleared to avoid writing same data twice
       tuples.clear();
     }
@@ -129,7 +133,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
     super.endWindow();
 
     //This window is done so write it to the database.
-    if(committedWindowId < currentWindowId) {
+    if (committedWindowId < currentWindowId) {
       store.storeCommittedWindowId(appId, operatorId, currentWindowId);
       committedWindowId = currentWindowId;
     }
@@ -139,7 +143,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
   public void processTuple(T tuple)
   {
     //Minimize duplicated data in the atleast once case
-    if(committedWindowId >= currentWindowId) {
+    if (committedWindowId >= currentWindowId) {
       return;
     }
 
@@ -149,7 +153,7 @@ public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S ext
       setStatementParameters(updateCommand, tuple);
       updateCommand.addBatch();
 
-      if(tuples.size() >= batchSize) {
+      if (tuples.size() >= batchSize) {
         tuples.clear();
         updateCommand.executeBatch();
         updateCommand.clearBatch();