You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/22 19:24:06 UTC

[2/3] apex-malhar git commit: fixing all checkstyle violations, delete maxAllowedViolations from pom

fixing all checkstyle violations,
delete maxAllowedViolations from pom


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5528a4c6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5528a4c6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5528a4c6

Branch: refs/heads/master
Commit: 5528a4c639a87dbfaba4a2bae68ac99971c66224
Parents: 4cbbb75
Author: Apex Dev <de...@apex.apache.org>
Authored: Wed Jan 18 14:43:33 2017 -0800
Committer: Oliver W <ol...@datatorrent.com>
Committed: Fri Jan 20 11:12:20 2017 -0800

----------------------------------------------------------------------
 benchmark/pom.xml                               |  8 ----
 .../datatorrent/benchmark/ApplicationFixed.java | 15 +++---
 .../com/datatorrent/benchmark/Benchmark.java    | 17 +++----
 .../benchmark/CouchBaseAppInput.java            |  6 +--
 .../benchmark/CouchBaseAppOutput.java           |  8 ++--
 .../benchmark/CouchBaseInputOperator.java       | 17 ++++---
 .../benchmark/FixedTuplesInputOperator.java     | 11 +++--
 .../datatorrent/benchmark/RandomMapOutput.java  | 40 +++++++++-------
 .../benchmark/RandomWordInputModule.java        | 11 +++--
 .../benchmark/WordCountOperator.java            | 10 ++--
 .../AerospikeOutputBenchmarkApplication.java    | 17 ++++---
 .../aerospike/AerospikeOutputOperator.java      | 13 ++++--
 .../UniqueValueCountBenchmarkApplication.java   | 21 +++++----
 .../CassandraOutputBenchmarkApplication.java    | 17 ++++---
 .../cassandra/CassandraOutputOperator.java      | 14 +++---
 .../benchmark/fs/FSByteOutputOperator.java      |  9 ++--
 .../benchmark/fs/FSOutputOperatorBenchmark.java | 26 +++++++----
 .../hive/HiveInsertBenchmarkingApp.java         | 28 ++++++-----
 .../hive/HiveMapInsertBenchmarkingApp.java      | 31 +++++++------
 .../kafka/BenchmarkKafkaInputOperator.java      |  6 +--
 ...nchmarkPartitionableKafkaOutputOperator.java | 44 +++++++++---------
 .../benchmark/kafka/KafkaInputBenchmark.java    | 23 ++++-----
 .../benchmark/kafka/KafkaOutputBenchmark.java   |  5 +-
 .../benchmark/kafka/KafkaTestPartitioner.java   |  7 +--
 .../RubyOperatorBenchmarkApplication.java       | 11 +++--
 .../spillable/SpillableTestOperator.java        |  3 +-
 .../state/ManagedStateBenchmarkApp.java         |  3 +-
 .../benchmark/state/StoreOperator.java          |  9 ++--
 .../stream/DevNullCounterBenchmark.java         | 13 +++---
 .../benchmark/stream/IntegerOperator.java       | 17 ++++---
 .../benchmark/stream/StreamDuplicaterApp.java   | 21 +++++----
 .../benchmark/stream/StreamMergeApp.java        |  5 +-
 .../benchmark/testbench/EventClassifierApp.java | 11 +++--
 .../EventClassifierNumberToHashDoubleApp.java   | 18 ++++---
 .../benchmark/testbench/EventGeneratorApp.java  |  9 ++--
 .../testbench/EventIncrementerApp.java          | 22 +++++----
 .../testbench/FilterClassifierApp.java          | 15 +++---
 .../testbench/FilteredEventClassifierApp.java   |  8 ++--
 .../benchmark/testbench/HashMapOperator.java    | 37 +++++++++------
 .../testbench/RandomEventGeneratorApp.java      |  8 ++--
 .../testbench/SeedEventGeneratorApp.java        | 14 ++++--
 .../testbench/ThroughputCounterApp.java         | 13 ++++--
 .../AbstractWindowedOperatorBenchmarkApp.java   | 10 ++--
 .../KeyedWindowedOperatorBenchmarkApp.java      |  9 ++--
 .../window/WindowedOperatorBenchmarkApp.java    |  6 ++-
 .../benchmark/ApplicationFixedTest.java         | 17 ++++---
 .../datatorrent/benchmark/BenchmarkTest.java    |  3 +-
 .../benchmark/CouchBaseBenchmarkTest.java       | 13 +++---
 .../benchmark/accumulo/AccumuloApp.java         | 17 ++++---
 .../benchmark/accumulo/AccumuloAppTest.java     |  8 ++--
 .../aerospike/AerospikeBenchmarkAppTest.java    |  7 +--
 .../algo/UniqueValueCountBenchmarkTest.java     |  6 ++-
 .../cassandra/CassandraApplicatonTest.java      |  8 ++--
 .../benchmark/hbase/HBaseApplicationTest.java   |  9 ++--
 .../hbase/HBaseCsvMappingApplication.java       |  7 +--
 .../benchmark/hive/HiveInsertBenchmarkTest.java | 37 +++++++++------
 .../benchmark/hive/HiveMapBenchmarkTest.java    | 38 ++++++++-------
 .../kafka/KafkaInputBenchmarkTest.java          |  3 +-
 .../kafka/KafkaOutputBenchmarkTest.java         |  4 +-
 .../benchmark/memsql/MemsqlInputBenchmark.java  | 21 +++++----
 .../memsql/MemsqlInputBenchmarkTest.java        | 49 ++++++++++++--------
 .../benchmark/memsql/MemsqlOutputBenchmark.java | 26 ++++++-----
 .../memsql/MemsqlOutputBenchmarkTest.java       | 21 +++++----
 .../script/RubyOperatorBenchmarkAppTest.java    |  7 ++-
 .../spillable/SpillableDSBenchmarkTest.java     |  6 +--
 .../state/ManagedStateBenchmarkAppTest.java     |  3 --
 .../testbench/EventClassifierAppTest.java       | 12 +++--
 ...ventClassifierNumberToHashDoubleAppTest.java | 12 +++--
 .../testbench/EventGeneratorAppTest.java        | 12 +++--
 .../testbench/EventIncrementerAppTest.java      | 12 +++--
 .../testbench/FilterClassifierAppTest.java      | 10 ++--
 .../FilteredEventClassifierAppTest.java         | 12 +++--
 .../testbench/ThroughputCounterAppTest.java     | 12 +++--
 .../util/serde/GenericSerdePerformanceTest.java |  7 +--
 74 files changed, 608 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index b2e0981..4bbd5ac 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -143,14 +143,6 @@
            <skip>true</skip>
          </configuration>
        </plugin>
-       <plugin>
-         <groupId>org.apache.maven.plugins</groupId>
-         <artifactId>maven-checkstyle-plugin</artifactId>
-         <configuration>
-           <maxAllowedViolations>281</maxAllowedViolations>
-           <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
-         </configuration>
-       </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
index 53f01fc..aa10eea 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
@@ -18,13 +18,15 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
 
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 /**
  * Example of application configuration in Java.
@@ -34,7 +36,7 @@ import org.apache.hadoop.conf.Configuration;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="PerformanceBenchmarkForFixedNumberOfTuples")
+@ApplicationAnnotation(name = "PerformanceBenchmarkForFixedNumberOfTuples")
 public class ApplicationFixed implements StreamingApplication
 {
   private final Locality locality = null;
@@ -44,7 +46,8 @@ public class ApplicationFixed implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     FixedTuplesInputOperator wordGenerator = dag.addOperator("WordGenerator", FixedTuplesInputOperator.class);
-    dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     wordGenerator.setCount(500000);
 
     WordCountOperator<byte[]> counter = dag.addOperator("Counter", new WordCountOperator<byte[]>());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
index 5649914..d8d51b8 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
@@ -52,10 +52,10 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  * @since 0.9.0
  */
 
-@ApplicationAnnotation(name="PerformanceBenchmarkingApp")
+@ApplicationAnnotation(name = "PerformanceBenchmarkingApp")
 public abstract class Benchmark
 {
-  static abstract class AbstractApplication implements StreamingApplication
+  abstract static class AbstractApplication implements StreamingApplication
   {
     public static final int QUEUE_CAPACITY = 32 * 1024;
 
@@ -63,7 +63,8 @@ public abstract class Benchmark
     public void populateDAG(DAG dag, Configuration conf)
     {
       RandomWordInputModule wordGenerator = dag.addOperator("wordGenerator", RandomWordInputModule.class);
-      dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+      dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+          .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
 
       WordCountOperator<byte[]> counter = dag.addOperator("counter", new WordCountOperator<byte[]>());
       dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
@@ -77,7 +78,7 @@ public abstract class Benchmark
   /**
    * Let the engine decide how to best place the 2 operators.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkNoLocality")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkNoLocality")
   public static class NoLocality extends AbstractApplication
   {
     @Override
@@ -92,7 +93,7 @@ public abstract class Benchmark
    * Place the 2 operators so that they are in the same Rack.
    * The operators are requested to be deployed on different machines.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkRackLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkRackLocal")
   public static class RackLocal extends AbstractApplication
   {
     @Override
@@ -107,7 +108,7 @@ public abstract class Benchmark
    * Place the 2 operators so that they are in the same node.
    * The operators are requested to be deployed as different processes within the same machine.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkNodeLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkNodeLocal")
   public static class NodeLocal extends AbstractApplication
   {
     @Override
@@ -122,7 +123,7 @@ public abstract class Benchmark
    * Place the 2 operators so that they are in the same container.
    * The operators are deployed as different threads in the same process.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkContainerLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkContainerLocal")
   public static class ContainerLocal extends AbstractApplication
   {
     @Override
@@ -136,7 +137,7 @@ public abstract class Benchmark
   /**
    * Place the 2 operators so that they are in the same thread.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkThreadLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkThreadLocal")
   public static class ThreadLocal extends AbstractApplication
   {
     @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
index 6096530..bf5b876 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
 import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 /**
- *
  * Application to benchmark the performance of couchbase input operator.
  * The number of tuples processed per second were around 9000.
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
index f789d08..4f12791 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
@@ -18,12 +18,14 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
 import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
index 923b588..8ae0a94 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
@@ -18,12 +18,14 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
-import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
 import java.util.ArrayList;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
+import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
+
 /**
  * <p>CouchBaseInputOperator class.</p>
  *
@@ -32,14 +34,15 @@ import org.slf4j.LoggerFactory;
 public class CouchBaseInputOperator extends AbstractCouchBaseInputOperator<String>
 {
   private static final Logger logger = LoggerFactory.getLogger(CouchBaseWindowStore.class);
+
   @Override
   public String getTuple(Object object)
   {
-    if(object!=null)
-    return object.toString();
-    else{
-     logger.info("Object returned is null");
-     return "null";
+    if (object != null) {
+      return object.toString();
+    } else {
+      logger.info("Object returned is null");
+      return "null";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
index ad7f8c1..f2582bd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
@@ -18,14 +18,15 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
+import java.util.ArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
 
 /**
  * <p>FixedTuplesInputOperator class.</p>
@@ -44,7 +45,7 @@ public class FixedTuplesInputOperator implements InputOperator
   {
     if (firstTime) {
       long start = System.currentTimeMillis();
-      for (int i = count; i-- > 0;) {
+      for (int i = count; i-- > 0; ) {
         output.emit(new byte[64]);
       }
       firstTime = false;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
index 106bd79..3342771 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
@@ -19,20 +19,23 @@
 package com.datatorrent.benchmark;
 
 import java.util.HashMap;
-import com.datatorrent.common.util.BaseOperator;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Operator that outputs random values in a map.
  *
  * @since 1.0.4
  */
-public class RandomMapOutput extends BaseOperator {
+public class RandomMapOutput extends BaseOperator
+{
 
-  public final transient DefaultOutputPort<HashMap<String, Object>> map_data = new DefaultOutputPort<HashMap<String, Object>>();
+  public final transient DefaultOutputPort<HashMap<String, Object>> map_data =
+      new DefaultOutputPort<HashMap<String, Object>>();
   public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
-      {
+  {
     @Override
     public void process(Integer tuple)
     {
@@ -40,22 +43,25 @@ public class RandomMapOutput extends BaseOperator {
       map.put(key, tuple);
       RandomMapOutput.this.process(map);
     }
-      };
+  };
 
-      private String key;
+  private String key;
 
-      public String getKey() {
-        return key;
-      }
+  public String getKey()
+  {
+    return key;
+  }
 
-      public void setKey(String key) {
-        this.key = key;
-      }
+  public void setKey(String key)
+  {
+    this.key = key;
+  }
 
-      public void process(HashMap<String, Object> tuple) {
+  public void process(HashMap<String, Object> tuple)
+  {
 
-        if (map_data.isConnected()) {
-          map_data.emit(tuple);
-        }
-      }
+    if (map_data.isConnected()) {
+      map_data.emit(tuple);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
index 11c7568..7d02de2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
@@ -18,10 +18,12 @@
  */
 package com.datatorrent.benchmark;
 
+import javax.validation.constraints.Min;
+
+import com.datatorrent.api.Context.OperatorContext;
+
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
-import javax.validation.constraints.Min;
 
 /**
  * <p>
@@ -87,7 +89,6 @@ public class RandomWordInputModule implements InputOperator
     return emitSameTuple;
   }
 
-
   /**
    * Emits byte array of specified size.
    * Emits either the same byte array or creates new byte array every time
@@ -103,11 +104,11 @@ public class RandomWordInputModule implements InputOperator
     final boolean EMIT_SAME_TUPLE_COPY = emitSameTuple;
     if (firstTime) {
       if (EMIT_SAME_TUPLE_COPY) {
-        for (int i = count--; i-- > 0;) {
+        for (int i = count--; i-- > 0; ) {
           output.emit(sameTupleArray);
         }
       } else {
-        for (int i = count--; i-- > 0;) {
+        for (int i = count--; i-- > 0; ) {
           output.emit(new byte[TUPLE_SIZE_COPY]);
         }
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
index 098ed42..6e91482 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
@@ -21,14 +21,17 @@ package com.datatorrent.benchmark;
 /*
  * To change this template, choose Tools | Templates and open the template in the editor.
  */
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Context.OperatorContext;
 
 import java.util.ArrayList;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
 /**
  * <p>WordCountOperator class.</p>
  *
@@ -84,5 +87,6 @@ public class WordCountOperator<T> implements Operator
     counts = new ArrayList<Integer>();
     millis = new ArrayList<Integer>();
   }
+
   private static final Logger logger = LoggerFactory.getLogger(WordCountOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
index a70aae6..0a880fd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
@@ -18,15 +18,16 @@
  */
 package com.datatorrent.benchmark.aerospike;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.aerospike.AerospikeTransactionalStore;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
+
 /**
- *
  * Application to benchmark the performance of aerospike output operator.
  * The operator was tested on DT cluster and the number of tuples processed
  * by the operator per second were around 12,000
@@ -34,16 +35,18 @@ import org.apache.hadoop.conf.Configuration;
  * @since 1.0.4
  */
 
-
-@ApplicationAnnotation(name="AerospikeOutputOperatorBenchmark")
-public class AerospikeOutputBenchmarkApplication implements StreamingApplication {
+@ApplicationAnnotation(name = "AerospikeOutputOperatorBenchmark")
+public class AerospikeOutputBenchmarkApplication implements StreamingApplication
+{
 
   private final String NODE = "127.0.0.1";
   private final int PORT = 3000;
   private final String NAMESPACE = "test";
   private final Locality locality = null;
+
   @Override
-  public void populateDAG(DAG dag, Configuration conf) {
+  public void populateDAG(DAG dag, Configuration conf)
+  {
 
     RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
     rand.setMaxvalue(3000);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
index 210e086..f9ee689 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
@@ -23,25 +23,28 @@ import java.util.List;
 import com.aerospike.client.AerospikeException;
 import com.aerospike.client.Bin;
 import com.aerospike.client.Key;
-import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
 
+import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
 
 /**
  * <p>AerospikeOutputOperator class.</p>
  *
  * @since 1.0.4
  */
-public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>{
+public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>
+{
 
   private final String KEYSPACE = "test";
   private final String SET_NAME = "Aerospike_Output";
   private int id = 0;
+
   @Override
   protected Key getUpdatedBins(Integer tuple, List<Bin> bins)
-      throws AerospikeException {
+    throws AerospikeException
+  {
 
-    Key key = new Key(KEYSPACE,SET_NAME,id++);
-    bins.add(new Bin("ID",tuple));
+    Key key = new Key(KEYSPACE, SET_NAME, id++);
+    bins.add(new Bin("ID", tuple));
     return key;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
index f522396..f74311e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
@@ -18,23 +18,22 @@
  */
 package com.datatorrent.benchmark.algo;
 
-
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
 
 import com.datatorrent.lib.stream.Counter;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
 
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 /**
  * Application to demonstrate PartitionableUniqueCount operator. <br>
  * The input operator generate random keys, which is sent to
@@ -63,9 +62,11 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio
 
     /* Initialize with three partition to start with */
     UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>());
-    MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
+    MapToKeyHashValuePairConverter<Integer, Integer> converter =
+        dag.addOperator("converter", new MapToKeyHashValuePairConverter());
 
-    dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3));
+    dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER,
+        new StatelessPartitioner<UniqueCounter<Integer>>(3));
     dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true);
     uniqCount.setCumulative(false);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
index dead2cd..46d503f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
@@ -19,24 +19,27 @@
 package com.datatorrent.benchmark.cassandra;
 
 import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
+
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+
 import com.datatorrent.contrib.cassandra.CassandraTransactionalStore;
 
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
 /**
- *
- *Application to benchmark the performance of cassandra output operator.
- *The operator was tested on following configuration:
- *Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
- *The number of tuples processed per second were around 20,000
+ * Application to benchmark the performance of cassandra output operator.
+ * The operator was tested on following configuration:
+ * Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
+ * The number of tuples processed per second were around 20,000
  *
  * @since 1.0.3
  */
 
-@ApplicationAnnotation(name="CassandraOperatorDemo")
+@ApplicationAnnotation(name = "CassandraOperatorDemo")
 public class CassandraOutputBenchmarkApplication implements StreamingApplication
 {
   private final Locality locality = null;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
index 666746b..592d8a2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
@@ -18,34 +18,36 @@
  */
 package com.datatorrent.benchmark.cassandra;
 
-
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
 
+import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
 
 /**
  * <p>CassandraOutputOperator class.</p>
  *
  * @since 1.0.3
  */
-public class CassandraOutputOperator extends  AbstractCassandraTransactionableOutputOperator<Integer>{
+public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>
+{
 
   private int id = 0;
 
   @Override
-  protected PreparedStatement getUpdateCommand() {
+  protected PreparedStatement getUpdateCommand()
+  {
     String statement = "Insert into test.cassandra_operator(id, result) values (?,?);";
     return store.getSession().prepare(statement);
   }
 
   @Override
   protected Statement setStatementParameters(PreparedStatement updateCommand,
-      Integer tuple) throws DriverException {
+      Integer tuple) throws DriverException
+  {
     BoundStatement boundStmnt = new BoundStatement(updateCommand);
-    return boundStmnt.bind(id++,tuple);
+    return boundStmnt.bind(id++, tuple);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
index 894cb75..ce0821c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
@@ -18,10 +18,12 @@
  */
 package com.datatorrent.benchmark.fs;
 
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 import java.util.Arrays;
+
 import javax.validation.constraints.Min;
 
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
 /**
  * This output operator receives
  *
@@ -38,19 +40,20 @@ public class FSByteOutputOperator extends AbstractFileOutputOperator<byte[]>
   /**
    * The file a tuple is written out to is determined by modding the hashcode of the
    * tuple by the outputFileCount.
+   *
    * @param tuple The input tuple to write out.
    * @return The name of the file to write the tuple to.
    */
   @Override
   protected String getFileName(byte[] tuple)
   {
-    return ((Integer) (Arrays.hashCode(tuple) % outputFileCount)).toString();
+    return ((Integer)(Arrays.hashCode(tuple) % outputFileCount)).toString();
   }
 
   @Override
   protected byte[] getBytesForTuple(byte[] tuple)
   {
-    for(int counter = 0;
+    for (int counter = 0;
         counter < tuple.length;
         counter++) {
       tuple[counter] += 1;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
index 8702ab8..7a63d18 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
@@ -18,17 +18,20 @@
  */
 package com.datatorrent.benchmark.fs;
 
-import com.datatorrent.lib.testbench.RandomWordGenerator;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG;
+
+import com.datatorrent.api.StreamingApplication;
+
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.counters.BasicCounters;
-import org.apache.commons.lang.mutable.MutableLong;
 
+import com.datatorrent.lib.counters.BasicCounters;
 
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.testbench.RandomWordGenerator;
 
 /**
  * Application used to benchmark HDFS output operator
@@ -38,25 +41,28 @@ import org.apache.hadoop.conf.Configuration;
  * @since 0.9.4
  */
 
-@ApplicationAnnotation(name="HDFSOutputOperatorBenchmarkingApp")
+@ApplicationAnnotation(name = "HDFSOutputOperatorBenchmarkingApp")
 public class FSOutputOperatorBenchmark implements StreamingApplication
 {
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     String filePath = "HDFSOutputOperatorBenchmarkingApp/"
-            + System.currentTimeMillis();
+        + System.currentTimeMillis();
 
     dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
 
     RandomWordGenerator wordGenerator = dag.addOperator("wordGenerator", RandomWordGenerator.class);
 
-    dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
-    dag.getOperatorMeta("wordGenerator").getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
+    dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output)
+        .getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
+    dag.getOperatorMeta("wordGenerator").getAttributes()
+        .put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
 
     FSByteOutputOperator hdfsOutputOperator = dag.addOperator("hdfsOutputOperator", new FSByteOutputOperator());
     hdfsOutputOperator.setFilePath(filePath);
-    dag.getOperatorMeta("hdfsOutputOperator").getAttributes().put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
+    dag.getOperatorMeta("hdfsOutputOperator").getAttributes()
+        .put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
 
     dag.addStream("Generator2HDFSOutput", wordGenerator.output, hdfsOutputOperator.input);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
index 60be57d..95fa961 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
@@ -30,16 +30,18 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
-import com.datatorrent.contrib.hive.HiveOperator;
-import com.datatorrent.contrib.hive.HiveStore;
-
 import com.datatorrent.api.Context.OperatorContext;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
+
 import com.datatorrent.api.StreamingApplication;
+
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
 
 /**
  * Application used to benchmark HIVE Insert operator
@@ -79,13 +81,14 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
   {
     HiveStore store = new HiveStore();
     store.setDatabaseUrl(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    store.setConnectionProperties(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    store.setConnectionProperties(conf.get(
+        "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
     store.setFilepath(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
 
     try {
-      hiveInitializeDatabase(store, conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
-    }
-    catch (SQLException ex) {
+      hiveInitializeDatabase(store, conf.get(
+          "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+    } catch (SQLException ex) {
       LOG.debug(ex.getMessage());
     }
 
@@ -109,8 +112,9 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
   {
     hiveStore.connect();
     Statement stmt = hiveStore.getConnection().createStatement();
-    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
-            + "STORED AS TEXTFILE ");
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+        + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
+        + "STORED AS TEXTFILE ");
     hiveStore.disconnect();
   }
 
@@ -171,8 +175,8 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
     {
 
       for (;
-              tupleCounter < tuplesPerWindow;
-              tupleCounter++) {
+          tupleCounter < tuplesPerWindow;
+          tupleCounter++) {
         String output = "2014-12-1" + random.nextInt(10) + "";
         outputString.emit(output);
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
index cfbbfc5..98d9ce3 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
@@ -24,23 +24,23 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
-import com.datatorrent.contrib.hive.*;
-
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 import com.datatorrent.benchmark.RandomMapOutput;
 
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+
 /**
  * Application used to benchmark HIVE Map Insert operator
  * The DAG consists of random Event generator operator that is
@@ -61,12 +61,13 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
   {
     HiveStore store = new HiveStore();
     store.setDatabaseUrl(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    store.setConnectionProperties(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    store.setConnectionProperties(conf.get(
+        "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
     store.setFilepath(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
     try {
-      hiveInitializeMapDatabase(store, conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
-    }
-    catch (SQLException ex) {
+      hiveInitializeMapDatabase(store, conf.get(
+          "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
+    } catch (SQLException ex) {
       LOG.debug(ex.getMessage());
     }
     dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
@@ -90,13 +91,15 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
   /*
    * User can create table and specify data columns and partition columns in this function.
    */
-  public static void hiveInitializeMapDatabase(HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
+  public static void hiveInitializeMapDatabase(
+      HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
   {
     hiveStore.connect();
     Statement stmt = hiveStore.getConnection().createStatement();
-    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
-            + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
-            + "STORED AS TEXTFILE ");
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+        + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
+        + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
+        + "STORED AS TEXTFILE ");
     hiveStore.disconnect();
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
index 8239ea7..e147ad7 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
@@ -18,11 +18,11 @@
  */
 package com.datatorrent.benchmark.kafka;
 
-import kafka.message.Message;
-
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator;
 
+import kafka.message.Message;
+
 /**
  * This operator emits one constant message for each kafka message received.&nbsp;
  * So we can track the throughput by messages emitted per second in the stram platform.
@@ -38,7 +38,7 @@ public class BenchmarkKafkaInputOperator extends AbstractKafkaInputOperator
   /**
    * The output port on which messages are emitted.
    */
-  public transient DefaultOutputPort<String>  oport = new DefaultOutputPort<String>();
+  public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>();
 
   @Override
   protected void emitTuple(Message message)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
index 1126ac1..6353c37 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.benchmark.kafka;
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
@@ -26,9 +25,6 @@ import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 
 import javax.validation.constraints.Min;
 
@@ -37,21 +33,27 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultPartition;
+
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.Partitioner;
 
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
 /**
  * This operator keep sending constant messages(1kb each) in {@link #threadNum} threads.&nbsp;
  * Messages are distributed evenly to partitions.
  * <p></p>
+ *
  * @displayName Benchmark Partitionable Kafka Output
  * @category Messaging
  * @tags output operator
- *
  * @since 0.9.3
  */
-public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
+public class BenchmarkPartitionableKafkaOutputOperator implements
+    Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
 {
 
   private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class);
@@ -78,7 +80,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
 
   private int stickyKey = 0;
 
-  private transient Runnable r = new Runnable() {
+  private transient Runnable r = new Runnable()
+  {
 
     Producer<String, String> producer = null;
 
@@ -101,12 +104,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
       }
       long k = 0;
 
-      while (k<msgsSecThread || !controlThroughput) {
+      while (k < msgsSecThread || !controlThroughput) {
         long key = (stickyKey >= 0 ? stickyKey : k);
         k++;
         producer.send(new KeyedMessage<String, String>(topic, "" + key, new String(constantMsg)));
-        if(k==Long.MAX_VALUE){
-          k=0;
+        if (k == Long.MAX_VALUE) {
+          k = 0;
         }
       }
     }
@@ -152,10 +155,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
    * {@inheritDoc}
    */
   @Override
-  public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
+  public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(
+      Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
   {
 
-    ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
+    ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions =
+        new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
 
     for (int i = 0; i < partitionCount; i++) {
       BenchmarkPartitionableKafkaOutputOperator bpkoo = new BenchmarkPartitionableKafkaOutputOperator();
@@ -163,7 +168,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
       bpkoo.setTopic(topic);
       bpkoo.setBrokerList(brokerList);
       bpkoo.setStickyKey(i);
-      Partition<BenchmarkPartitionableKafkaOutputOperator> p = new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
+      Partition<BenchmarkPartitionableKafkaOutputOperator> p =
+          new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
       newPartitions.add(p);
     }
     return newPartitions;
@@ -176,20 +182,17 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
     logger.info("Activate the benchmark kafka output operator .... ");
     constantMsg = new byte[msgSize];
     for (int i = 0; i < constantMsg.length; i++) {
-      constantMsg[i] = (byte) ('a' + i%26);
+      constantMsg[i] = (byte)('a' + i % 26);
     }
 
-
     for (int i = 0; i < threadNum; i++) {
-      if(controlThroughput){
+      if (controlThroughput) {
         ses.scheduleAtFixedRate(r, 0, 1, TimeUnit.SECONDS);
-      }
-      else {
+      } else {
         ses.submit(r);
       }
     }
 
-
   }
 
   @Override
@@ -268,7 +271,4 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
     this.stickyKey = stickyKey;
   }
 
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
index 159ee60..ead6c66 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
@@ -18,24 +18,27 @@
  */
 package com.datatorrent.benchmark.kafka;
 
-import com.google.common.collect.Sets;
 import java.util.Properties;
 
+
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
+
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
 import com.datatorrent.contrib.kafka.KafkaConsumer;
 import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
 
-
 /**
  * The stream app to test the benckmark of kafka
  * You can set the property file to make it using either {@link SimpleKafkaConsumer} or {@link HighlevelKafkaConsumer}
@@ -43,13 +46,14 @@ import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
  *
  * @since 0.9.3
  */
-@ApplicationAnnotation(name="KafkaInputBenchmark")
+@ApplicationAnnotation(name = "KafkaInputBenchmark")
 public class KafkaInputBenchmark implements StreamingApplication
 {
 
   public static class CollectorModule extends BaseOperator
   {
-    public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() {
+    public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>()
+    {
 
       @Override
       public void process(String arg0)
@@ -65,12 +69,10 @@ public class KafkaInputBenchmark implements StreamingApplication
     dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo");
     BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator();
 
-
     String type = conf.get("kafka.consumertype", "simple");
 
     KafkaConsumer consumer = null;
 
-
     if (type.equals("highlevel")) {
       // Create template high-level consumer
 
@@ -96,7 +98,6 @@ public class KafkaInputBenchmark implements StreamingApplication
     dag.setAttribute(bpkio, OperatorContext.COUNTERS_AGGREGATOR, new KafkaConsumer.KafkaMeterStatsAggregator());
 //    dag.setAttribute(bpkio, OperatorContext.STATS_LISTENER, KafkaMeterStatsListener.class);
 
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
index ca1de48..0dd4352 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * @since 0.9.3
  */
-@ApplicationAnnotation(name="KafkaOutputBenchmark")
+@ApplicationAnnotation(name = "KafkaOutputBenchmark")
 public class KafkaOutputBenchmark implements StreamingApplication
 {
 
@@ -37,7 +37,8 @@ public class KafkaOutputBenchmark implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     dag.setAttribute(DAG.APPLICATION_NAME, "KafkaOutputBenchmark");
-    BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator("KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
+    BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator(
+        "KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
     bpkoo.setBrokerList(conf.get("kafka.brokerlist"));
     bpkoo.setPartitionCount(2);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
index 1d22613..65601d5 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
@@ -21,7 +21,6 @@ package com.datatorrent.benchmark.kafka;
 import kafka.producer.Partitioner;
 import kafka.utils.VerifiableProperties;
 
-
 /**
  * A simple partitioner class for test purpose
  * Key is a int string
@@ -32,12 +31,14 @@ import kafka.utils.VerifiableProperties;
  */
 public class KafkaTestPartitioner implements Partitioner
 {
-  public KafkaTestPartitioner (VerifiableProperties props) {
+  public KafkaTestPartitioner(VerifiableProperties props)
+  {
 
   }
+
   @Override
   public int partition(Object key, int num_Partitions)
   {
-    return Integer.parseInt((String)key)%num_Partitions;
+    return Integer.parseInt((String)key) % num_Partitions;
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
index bc23404..b86cd01 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
@@ -18,17 +18,20 @@
  */
 package com.datatorrent.benchmark.script;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+
 import com.datatorrent.benchmark.RandomMapOutput;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.contrib.ruby.RubyOperator;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
 
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
index 07ab02e..7c45106 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -169,7 +169,8 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
     long countInPeriod = totalCount - lastTotalCount;
     long timeInPeriod = System.currentTimeMillis() - lastLogTime;
     long totalTime = System.currentTimeMillis() - beginTime;
-    logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
+    logger.info(
+        "Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
         totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index ae5ba40..2dc6f0d 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -98,7 +98,8 @@ public class ManagedStateBenchmarkApp implements StreamingApplication
 
   public static class TestGenerator extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
+    public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data =
+        new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
     int emitBatchSize = 1000;
     byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
     int rate = 20000;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 74ba658..60a775c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -69,7 +69,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   private ExecMode execMode = ExecMode.INSERT;
   private int timeRange = 1000 * 60;
 
-  public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
+  public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input =
+      new DefaultInputPort<KeyValPair<byte[], byte[]>>()
   {
     @Override
     public void process(KeyValPair<byte[], byte[]> tuple)
@@ -172,7 +173,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   private final int taskBarrier = 100000;
 
   /**
-   * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value.
+   * This method first send request of get to the state manager,
+   * then handle all the task(get) which already done and update the value.
    * @param tuple
    */
   private void updateAsync(KeyValPair<byte[], byte[]> tuple)
@@ -251,7 +253,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
     long spentTime = now - statisticsBeginTime;
     long totalSpentTime = now - applicationBeginTime;
     totalTupleCount += tupleCount;
-    logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
+    logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}",
+        windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
         totalTupleCount * 1000 / totalSpentTime);
 
     statisticsBeginTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
index e0ee160..b0b7314 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.benchmark.stream;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNullCounter;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  *
@@ -56,11 +57,11 @@ public class DevNullCounterBenchmark implements StreamingApplication
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-   // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
-   // rand.setMinvalue(0);
-   // rand.setMaxvalue(999999);
-   // rand.setTuplesBlastIntervalMillis(50);
-   // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    // rand.setMinvalue(0);
+    // rand.setMaxvalue(999999);
+    // rand.setTuplesBlastIntervalMillis(50);
+    // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
     DevNullCounter oper = dag.addOperator("oper", new DevNullCounter());
     dag.getMeta(oper).getMeta(oper.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
index ff6ed76..c716206 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
@@ -35,37 +35,42 @@ public class IntegerOperator implements InputOperator
    * Output port which emits integer.
    */
   public final transient DefaultOutputPort<Integer> integer_data = new DefaultOutputPort<Integer>();
+
   @Override
   public void emitTuples()
   {
     Integer i = 21;
-    for(int j=0;j<1000;j++){
-    integer_data.emit(i);
+    for (int j = 0; j < 1000; j++) {
+      integer_data.emit(i);
     }
   }
 
   @Override
   public void beginWindow(long windowId)
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void endWindow()
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void setup(OperatorContext context)
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void teardown()
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
index 951b44b..2e5bcf9 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.benchmark.stream;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.stream.StreamDuplicater;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for StreamDuplicater Operator.
@@ -36,25 +37,25 @@ import org.apache.hadoop.conf.Configuration;
 @ApplicationAnnotation(name = "StreamDuplicaterApp")
 public class StreamDuplicaterApp implements StreamingApplication
 {
-   private final Locality locality = null;
-   public static final int QUEUE_CAPACITY = 16 * 1024;
+  private final Locality locality = null;
+  public static final int QUEUE_CAPACITY = 16 * 1024;
 
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-   // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
-   // rand.setMinvalue(0);
-   // rand.setMaxvalue(999999);
-   // rand.setTuplesBlastIntervalMillis(50);
-   // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    // rand.setMinvalue(0);
+    // rand.setMaxvalue(999999);
+    // rand.setTuplesBlastIntervalMillis(50);
+    // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
     StreamDuplicater stream = dag.addOperator("stream", new StreamDuplicater());
     dag.getMeta(stream).getMeta(stream.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("streamdup1", intInput.integer_data, stream.data).setLocality(locality);
     DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull());
     DevNull<Integer> dev2 = dag.addOperator("dev2", new DevNull());
-    dag.addStream("streamdup2",stream.out1,dev1.data).setLocality(locality);
-    dag.addStream("streamdup3",stream.out2,dev2.data).setLocality(locality);
+    dag.addStream("streamdup2", stream.out1, dev1.data).setLocality(locality);
+    dag.addStream("streamdup3", stream.out2, dev2.data).setLocality(locality);
 
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
index d90320c..bb1d081 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.benchmark.stream;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.benchmark.WordCountOperator;
 import com.datatorrent.lib.stream.StreamMerger;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for StreamMerge Operator.
@@ -46,7 +47,7 @@ public class StreamMergeApp implements StreamingApplication
     StreamMerger stream = dag.addOperator("stream", new StreamMerger());
     dag.getMeta(stream).getMeta(stream.data1).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.getMeta(stream).getMeta(stream.data2).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
-    dag.addStream("streammerge1", intInput.integer_data, stream.data1,stream.data2).setLocality(locality);
+    dag.addStream("streammerge1", intInput.integer_data, stream.data1, stream.data2).setLocality(locality);
 
     WordCountOperator<Integer> counter = dag.addOperator("counter", new WordCountOperator<Integer>());
     dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
index 419de18..b1ddbee 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
@@ -18,6 +18,11 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,9 +30,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.EventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventClassifier Operator.
@@ -75,7 +77,8 @@ public class EventClassifierApp implements StreamingApplication
     eventClassifier.setKeyMap(keymap);
     eventClassifier.setOperationReplace();
     eventClassifier.setKeyWeights(wmap);
-    dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("eventtest1", hmapOper.hmap_data, eventClassifier.event).setLocality(locality);
     DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
     dag.addStream("eventtest2", eventClassifier.data, dev.data).setLocality(locality);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
index a49b30e..5fe478b 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
@@ -18,6 +18,10 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -26,8 +30,6 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.benchmark.WordCountOperator;
 import com.datatorrent.benchmark.stream.IntegerOperator;
 import com.datatorrent.lib.testbench.EventClassifierNumberToHashDouble;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventClassifierNumberToHashDouble Operator.
@@ -44,10 +46,14 @@ public class EventClassifierNumberToHashDoubleApp implements StreamingApplicatio
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    WordCountOperator<HashMap<String, Double>> counterString = dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
-    dag.getMeta(counterString).getMeta(counterString.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
-    EventClassifierNumberToHashDouble eventClassify = dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
-    dag.getMeta(eventClassify).getMeta(eventClassify.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    WordCountOperator<HashMap<String, Double>> counterString =
+        dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
+    dag.getMeta(counterString).getMeta(counterString.input).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    EventClassifierNumberToHashDouble eventClassify =
+        dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
+    dag.getMeta(eventClassify).getMeta(eventClassify.data)
+        .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
     dag.addStream("eventclassifier2", intInput.integer_data, eventClassify.event).setLocality(locality);
     dag.addStream("eventclassifier1", eventClassify.data, counterString.input).setLocality(locality);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
index 3025c7e..8f28ae6 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
@@ -18,6 +18,10 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,8 +29,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.EventGenerator;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventGenerator Operator.
@@ -44,7 +46,8 @@ public class EventGeneratorApp implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     EventGenerator eventGenerator = dag.addOperator("eventGenerator", new EventGenerator());
-    dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
 
     DevNull<String> devString = dag.addOperator("devString", new DevNull());
     DevNull<HashMap<String, Double>> devMap = dag.addOperator("devMap", new DevNull());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
index ea05d07..e562224 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.EventIncrementer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventIncrementer Operator.
@@ -39,6 +41,7 @@ public class EventIncrementerApp implements StreamingApplication
 {
   private final Locality locality = null;
   public static final int QUEUE_CAPACITY = 16 * 1024;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
@@ -55,15 +58,14 @@ public class EventIncrementerApp implements StreamingApplication
     eventInc.setKeylimits(keys, low, high);
     eventInc.setDelta(1);
     HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
-    dag.addStream("eventIncInput1",hmapOper.hmapList_data,eventInc.seed);
-    dag.addStream("eventIncInput2",hmapOper.hmapMap_data,eventInc.increment);
-    DevNull<HashMap<String,Integer>> dev1= dag.addOperator("dev1", new DevNull());
-    DevNull<HashMap<String,String>> dev2= dag.addOperator("dev2", new DevNull());
-    dag.addStream("eventIncOutput1",eventInc.count,dev1.data).setLocality(locality);
-    dag.addStream("eventIncOutput2",eventInc.data,dev2.data).setLocality(locality);
+    dag.addStream("eventIncInput1", hmapOper.hmapList_data, eventInc.seed);
+    dag.addStream("eventIncInput2", hmapOper.hmapMap_data, eventInc.increment);
+    DevNull<HashMap<String, Integer>> dev1 = dag.addOperator("dev1", new DevNull());
+    DevNull<HashMap<String, String>> dev2 = dag.addOperator("dev2", new DevNull());
+    dag.addStream("eventIncOutput1", eventInc.count, dev1.data).setLocality(locality);
+    dag.addStream("eventIncOutput2", eventInc.data, dev2.data).setLocality(locality);
 
   }
 
-
 }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
index 915e6f0..ea2943f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.FilterClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for FilterClassifier Operator.
@@ -39,6 +41,7 @@ public class FilterClassifierApp implements StreamingApplication
 {
   private final Locality locality = null;
   public static final int QUEUE_CAPACITY = 16 * 1024;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
@@ -80,9 +83,9 @@ public class FilterClassifierApp implements StreamingApplication
     filter.setTotalFilter(100);
 
     HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
-    DevNull<HashMap<String,Double>> dev = dag.addOperator("dev",  new DevNull());
-    dag.addStream("filter1",hmapOper.hmap_data,filter.data).setLocality(locality);
-    dag.addStream("filer2",filter.filter,dev.data).setLocality(locality);
+    DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
+    dag.addStream("filter1", hmapOper.hmap_data, filter.data).setLocality(locality);
+    dag.addStream("filer2", filter.filter, dev.data).setLocality(locality);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
index c3d996e..52c0bed 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.FilteredEventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for FilteredEventClassifier Operator.