You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/22 19:24:05 UTC

[1/3] apex-malhar git commit: fixing all checkstyle violations, delete maxAllowedViolations from pom

Repository: apex-malhar
Updated Branches:
  refs/heads/master 623b803f5 -> e22ea0de1


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
index e2a94ef..29cd079 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.testbench.EventGenerator;
 import java.util.ArrayList;
 import java.util.HashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.testbench.EventGenerator;
+
 /**
  * HashMap Input Operator used as a helper in testbench benchmarking apps.
  *
@@ -36,11 +38,15 @@ public class HashMapOperator implements InputOperator
 {
   private String keys = null;
   private static final Logger logger = LoggerFactory.getLogger(EventGenerator.class);
-  private String[] keysArray = {"a","b","c","d"};
-  public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data = new DefaultOutputPort<HashMap<String, Double>>();
-  public final transient DefaultOutputPort<HashMap<String, ArrayList<Integer>>> hmapList_data = new DefaultOutputPort<HashMap<String, ArrayList<Integer>>>();
-  public final transient DefaultOutputPort<HashMap<String, HashMap<String, Integer>>> hmapMap_data = new DefaultOutputPort<HashMap<String, HashMap<String, Integer>>>();
-  public final transient DefaultOutputPort<HashMap<String, Integer>> hmapInt_data = new DefaultOutputPort<HashMap<String, Integer>>();
+  private String[] keysArray = {"a", "b", "c", "d"};
+  public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data =
+      new DefaultOutputPort<HashMap<String, Double>>();
+  public final transient DefaultOutputPort<HashMap<String, ArrayList<Integer>>> hmapList_data =
+      new DefaultOutputPort<HashMap<String, ArrayList<Integer>>>();
+  public final transient DefaultOutputPort<HashMap<String, HashMap<String, Integer>>> hmapMap_data =
+      new DefaultOutputPort<HashMap<String, HashMap<String, Integer>>>();
+  public final transient DefaultOutputPort<HashMap<String, Integer>> hmapInt_data =
+      new DefaultOutputPort<HashMap<String, Integer>>();
   private int numTuples = 1000;
   private String seed = "a";
   private int numKeys = 2;
@@ -89,7 +95,7 @@ public class HashMapOperator implements InputOperator
         for (int j = 0; j < numKeys; j++) {
           hmapMapTemp.put(keysArray[j], 100 * j);
         }
-         for (int j = 0; j < numKeys; j++) {
+        for (int j = 0; j < numKeys; j++) {
           hmapMap.put(keysArray[j], hmapMapTemp);
         }
         hmapMap_data.emit(hmapMap);
@@ -107,7 +113,7 @@ public class HashMapOperator implements InputOperator
     }
 
     if (hmapInt_data.isConnected()) {
-        for (int i = 0; i < numTuples; i++) {
+      for (int i = 0; i < numTuples; i++) {
         HashMap<String, Integer> hmapMapTemp = new HashMap<String, Integer>();
         for (int j = 0; j < numKeys; j++) {
           hmapMapTemp.put(keysArray[j], 100 * j);
@@ -120,7 +126,8 @@ public class HashMapOperator implements InputOperator
   @Override
   public void beginWindow(long windowId)
   {
-    // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    // throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
@@ -132,13 +139,15 @@ public class HashMapOperator implements InputOperator
   @Override
   public void setup(OperatorContext context)
   {
-    // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    // throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void teardown()
   {
-    // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    // throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
index da757d9..df5b11e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for RandomEventGenerator Operator.
@@ -37,14 +38,15 @@ public class RandomEventGeneratorApp implements StreamingApplication
 {
   private final Locality locality = null;
   public static final int QUEUE_CAPACITY = 16 * 1024;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     RandomEventGenerator random = dag.addOperator("random", new RandomEventGenerator());
     DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull());
     DevNull<String> dev2 = dag.addOperator("dev2", new DevNull());
-    dag.addStream("random1",random.integer_data,dev1.data).setLocality(locality);
-    dag.addStream("random2",random.string_data,dev2.data).setLocality(locality);
+    dag.addStream("random1", random.integer_data, dev1.data).setLocality(locality);
+    dag.addStream("random2", random.string_data, dev2.data).setLocality(locality);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
index db18937..faafcbf 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
@@ -18,6 +18,10 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -26,9 +30,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.SeedEventGenerator;
 import com.datatorrent.lib.util.KeyValPair;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
+
 
 /**
  * Benchmark App for SeedEventGenerator Operator.
@@ -55,10 +57,12 @@ public class SeedEventGeneratorApp implements StreamingApplication
     DevNull<HashMap<String, String>> devVal = dag.addOperator("devVal", new DevNull<HashMap<String, String>>());
     DevNull<HashMap<String, ArrayList<Integer>>> devList = dag.addOperator("devList", new DevNull());
 
-    dag.getMeta(seedEvent).getMeta(seedEvent.string_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(seedEvent).getMeta(seedEvent.string_data)
+        .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("SeedEventGeneratorString", seedEvent.string_data, devString.data).setLocality(locality);
 
-    dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("SeedEventGeneratorKeyVal", seedEvent.keyvalpair_list, devKeyVal.data).setLocality(locality);
 
     dag.getMeta(seedEvent).getMeta(seedEvent.val_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
index 5b66b54..d6e762e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
@@ -18,14 +18,16 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.ThroughputCounter;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for ThroughputCounter Operator.
@@ -38,14 +40,15 @@ public class ThroughputCounterApp implements StreamingApplication
 {
   public static final int QUEUE_CAPACITY = 16 * 1024;
   private final Locality locality = null;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     ThroughputCounter counter = dag.addOperator("counter", new ThroughputCounter());
     HashMapOperator oper = dag.addOperator("oper", new HashMapOperator());
-    DevNull<HashMap<String,Number>> dev = dag.addOperator("dev", new DevNull());
-    dag.addStream("count1",oper.hmapInt_data,counter.data).setLocality(locality);
-    dag.addStream("count2",counter.count,dev.data).setLocality(locality);
+    DevNull<HashMap<String, Number>> dev = dag.addOperator("dev", new DevNull());
+    dag.addStream("count1", oper.hmapInt_data, counter.data).setLocality(locality);
+    dag.addStream("count2", counter.count, dev.data).setLocality(locality);
 
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
index 64af9f9..7250e74 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -50,7 +50,8 @@ import com.datatorrent.benchmark.window.WindowedOperatorBenchmarkApp.WindowedGen
 import com.datatorrent.lib.fileaccess.TFileImpl;
 import com.datatorrent.lib.stream.DevNull;
 
-public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator> implements StreamingApplication
+public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator>
+    implements StreamingApplication
 {
   protected static final String PROP_STORE_PATH = "dt.application.WindowedOperatorBenchmark.storeBasePath";
   protected static final String DEFAULT_BASE_PATH = "WindowedOperatorBenchmark/Store";
@@ -80,7 +81,8 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
 
 //    WatermarkGenerator watermarkGenerator = new WatermarkGenerator();
 //    dag.addOperator("WatermarkGenerator", watermarkGenerator);
-//    dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput).setLocality(Locality.CONTAINER_LOCAL);
+//    dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput)
+//      .setLocality(Locality.CONTAINER_LOCAL);
 
     DevNull output = dag.addOperator("output", new DevNull());
     dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL);
@@ -112,7 +114,8 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
       windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS));
       windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1)));
       //accumulating mode
-      windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
+      windowedOperator.setTriggerOption(TriggerOption.AtWatermark()
+          .withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
       windowedOperator.setFixedWatermark(30000);
       //windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
 
@@ -153,7 +156,6 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
     return basePath;
   }
 
-
   public static class TestStatsListener implements StatsListener, Serializable
   {
     private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
index 5a9c955..19df8fd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
@@ -41,7 +41,8 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.lib.fileaccess.TFileImpl;
 import com.datatorrent.lib.util.KeyValPair;
 
-public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
+public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<
+    KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
 {
   public KeyedWindowedOperatorBenchmarkApp()
   {
@@ -58,7 +59,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
   }
 
   @Override
-  protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl)
+  protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator,
+      Configuration conf, SpillableComplexComponentImpl sccImpl)
   {
     windowedOperator.setUpdatedKeyStorage(createUpdatedDataStorage(conf, sccImpl));
   }
@@ -107,7 +109,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
     }
   }
 
-  protected static class KeyedWindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
+  protected static class KeyedWindowedGenerator extends
+      AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
   {
     @Override
     protected TimestampedTuple<KeyValPair<String, Long>> generateNextTuple()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
index 98275ce..d96b453 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
@@ -35,7 +35,8 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 @ApplicationAnnotation(name = "WindowedOperatorBenchmark")
-public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator>
+public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<
+    WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator>
 {
   public WindowedOperatorBenchmarkApp()
   {
@@ -49,7 +50,8 @@ public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchm
     @Override
     protected TimestampedTuple<Long> generateNextTuple()
     {
-      return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000), (long)random.nextInt(100));
+      return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000),
+          (long)random.nextInt(100));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
index 1d76855..cd8a3ec 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
@@ -18,16 +18,17 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Context.PortContext;
-
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+
 /**
  * Test the DAG declaration in local mode.
  */
@@ -40,8 +41,10 @@ public class ApplicationFixedTest
     new ApplicationFixed().populateDAG(lma.getDAG(), new Configuration(false));
 
     DAG dag = lma.cloneDAG();
-    FixedTuplesInputOperator wordGenerator = (FixedTuplesInputOperator)dag.getOperatorMeta("WordGenerator").getOperator();
-    Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, (int)dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY));
+    FixedTuplesInputOperator wordGenerator = (FixedTuplesInputOperator)dag
+        .getOperatorMeta("WordGenerator").getOperator();
+    Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, (int)dag.getMeta(wordGenerator)
+        .getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY));
 
     LocalMode.Controller lc = lma.getController();
     lc.run(60000);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
index 9439243..0a21a7c 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.Benchmark;
 
 /**
  * Test the DAG declaration in local mode.
@@ -38,7 +37,7 @@ public class BenchmarkTest
   {
     for (final Locality l : Locality.values()) {
       logger.debug("Running the with {} locality", l);
-      LocalMode.runApp(new Benchmark.AbstractApplication ()
+      LocalMode.runApp(new Benchmark.AbstractApplication()
       {
         @Override
         public Locality getLocality()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
index 7c9f892..6a1c968 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
@@ -18,14 +18,17 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
-import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
 
 public class CouchBaseBenchmarkTest
 {
@@ -52,8 +55,7 @@ public class CouchBaseBenchmarkTest
       LocalMode.Controller lc = lm.getController();
       //lc.setHeartbeatMonitoringEnabled(false);
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       logger.info(ex.getCause());
     }
     is.close();
@@ -76,8 +78,7 @@ public class CouchBaseBenchmarkTest
       lm.prepareDAG(new CouchBaseAppInput(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       logger.info(ex.getCause());
     }
     is.close();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
index 5e407a0..e2936fe 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
@@ -27,6 +27,7 @@ import com.datatorrent.contrib.accumulo.AbstractAccumuloOutputOperator;
 import com.datatorrent.contrib.accumulo.AccumuloRowTupleGenerator;
 import com.datatorrent.contrib.accumulo.AccumuloTestHelper;
 import com.datatorrent.contrib.accumulo.AccumuloTuple;
+
 /**
  * BenchMark Results
  * -----------------
@@ -39,14 +40,16 @@ import com.datatorrent.contrib.accumulo.AccumuloTuple;
  *
  * @since 1.0.4
  */
-public class AccumuloApp implements StreamingApplication {
+public class AccumuloApp implements StreamingApplication
+{
 
   @Override
-  public void populateDAG(DAG dag, Configuration conf) {
+  public void populateDAG(DAG dag, Configuration conf)
+  {
     AccumuloTestHelper.getConnector();
     AccumuloTestHelper.clearTable();
     dag.setAttribute(DAG.APPLICATION_NAME, "AccumuloOutputTest");
-    AccumuloRowTupleGenerator rtg = dag.addOperator("tuplegenerator",AccumuloRowTupleGenerator.class);
+    AccumuloRowTupleGenerator rtg = dag.addOperator("tuplegenerator", AccumuloRowTupleGenerator.class);
     TestAccumuloOutputOperator taop = dag.addOperator("testaccumulooperator", TestAccumuloOutputOperator.class);
     dag.addStream("ss", rtg.outputPort, taop.input);
     com.datatorrent.api.Attribute.AttributeMap attributes = dag.getAttributes();
@@ -58,12 +61,14 @@ public class AccumuloApp implements StreamingApplication {
 
   }
 
-  public static class TestAccumuloOutputOperator extends AbstractAccumuloOutputOperator<AccumuloTuple> {
+  public static class TestAccumuloOutputOperator extends AbstractAccumuloOutputOperator<AccumuloTuple>
+  {
 
     @Override
-    public Mutation operationMutation(AccumuloTuple t) {
+    public Mutation operationMutation(AccumuloTuple t)
+    {
       Mutation mutation = new Mutation(t.getRow().getBytes());
-      mutation.put(t.getColFamily().getBytes(),t.getColName().getBytes(),t.getColValue().getBytes());
+      mutation.put(t.getColFamily().getBytes(), t.getColName().getBytes(), t.getColValue().getBytes());
       return mutation;
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
index 8f0a1fd..8b47a9b 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
@@ -22,9 +22,11 @@ import org.junit.Test;
 
 import com.datatorrent.api.LocalMode;
 
-public class AccumuloAppTest {
+public class AccumuloAppTest
+{
   @Test
-  public void testSomeMethod() throws Exception {
+  public void testSomeMethod() throws Exception
+  {
     LocalMode.runApp(new AccumuloApp(), 30000);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
index 8f2e19f..14fe441 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
@@ -21,12 +21,13 @@ package com.datatorrent.benchmark.aerospike;
 import org.junit.Test;
 
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.aerospike.AerospikeOutputBenchmarkApplication;
 
-public class AerospikeBenchmarkAppTest {
+public class AerospikeBenchmarkAppTest
+{
 
   @Test
-  public void test() throws Exception {
+  public void test() throws Exception
+  {
 
     LocalMode.runApp(new AerospikeOutputBenchmarkApplication(), 10000);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
index c54cbdf..079d073 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
@@ -18,10 +18,12 @@
  */
 package com.datatorrent.benchmark.algo;
 
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Test the DAG declaration in local mode.
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
index e85e38c..ec4f308 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
@@ -19,16 +19,18 @@
 package com.datatorrent.benchmark.cassandra;
 
 import org.junit.Test;
+
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.cassandra.CassandraOutputBenchmarkApplication;
 
 /**
  * Test the DAG declaration in local mode.
  */
-public class CassandraApplicatonTest {
+public class CassandraApplicatonTest
+{
 
   @Test
-  public void test() throws Exception {
+  public void test() throws Exception
+  {
     LocalMode.runApp(new CassandraOutputBenchmarkApplication(), 10000);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
index 1658ab1..32a4907 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
@@ -19,16 +19,19 @@
 package com.datatorrent.benchmark.hbase;
 
 import org.junit.Test;
+
 import com.datatorrent.api.LocalMode;
 
 /**
  * Test the DAG declaration in local mode.
  */
-public class HBaseApplicationTest {
+public class HBaseApplicationTest
+{
 
   @Test
-  public void test() throws Exception {
-     LocalMode.runApp(new HBaseCsvMappingApplication(), 20000);
+  public void test() throws Exception
+  {
+    LocalMode.runApp(new HBaseCsvMappingApplication(), 20000);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
index 562559f..b61f1d3 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
@@ -37,7 +37,7 @@ import com.datatorrent.contrib.hbase.HBaseRowStringGenerator;
  *
  * @since 1.0.4
  */
-@ApplicationAnnotation(name="HBaseBenchmarkApp")
+@ApplicationAnnotation(name = "HBaseBenchmarkApp")
 public class HBaseCsvMappingApplication implements StreamingApplication
 {
   private final Locality locality = null;
@@ -47,15 +47,12 @@ public class HBaseCsvMappingApplication implements StreamingApplication
   {
     HBaseRowStringGenerator row = dag.addOperator("rand", new HBaseRowStringGenerator());
 
-
     HBaseCsvMappingPutOperator csvMappingPutOperator = dag.addOperator("HBaseoper", new HBaseCsvMappingPutOperator());
     csvMappingPutOperator.getStore().setTableName("table1");
     csvMappingPutOperator.getStore().setZookeeperQuorum("127.0.0.1");
     csvMappingPutOperator.getStore().setZookeeperClientPort(2181);
     csvMappingPutOperator.setMappingString("colfam0.street,colfam0.city,colfam0.state,row");
-    dag.addStream("hbasestream",row.outputPort, csvMappingPutOperator.input).setLocality(locality);
+    dag.addStream("hbasestream", row.outputPort, csvMappingPutOperator.input).setLocality(locality);
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
index fdab095..653c6f6 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
@@ -18,18 +18,21 @@
  */
 package com.datatorrent.benchmark.hive;
 
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.netlet.util.DTThrowable;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.sql.SQLException;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.netlet.util.DTThrowable;
+
 public class HiveInsertBenchmarkTest
 {
   private static final Logger LOG = LoggerFactory.getLogger(HiveInsertBenchmarkTest.class);
@@ -41,26 +44,30 @@ public class HiveInsertBenchmarkTest
     InputStream inputStream = null;
     try {
       inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml");
-    }
-    catch (FileNotFoundException ex) {
-      LOG.debug("Exception caught",ex);
+    } catch (FileNotFoundException ex) {
+      LOG.debug("Exception caught", ex);
     }
     conf.addResource(inputStream);
 
-    LOG.debug("conf properties are {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
-    LOG.debug("conf dburl is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    LOG.debug("conf filepath is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
-    LOG.debug("maximum length is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength"));
-    LOG.debug("tablename is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
-    LOG.debug("permission is {}",conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission"));
+    LOG.debug("conf properties are {}",
+        conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    LOG.debug("conf dburl is {}",
+        conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
+    LOG.debug("conf filepath is {}",
+        conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
+    LOG.debug("maximum length is {}",
+        conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength"));
+    LOG.debug("tablename is {}",
+        conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+    LOG.debug("permission is {}",
+        conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission"));
     HiveInsertBenchmarkingApp app = new HiveInsertBenchmarkingApp();
     LocalMode lm = LocalMode.newInstance();
     try {
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(120000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
index 3acba3e..e0097c6 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.benchmark.hive;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.sql.SQLException;
 
@@ -28,16 +30,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 
-
 import com.datatorrent.api.LocalMode;
-
 import com.datatorrent.netlet.util.DTThrowable;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 
 public class HiveMapBenchmarkTest
 {
   private static final Logger LOG = LoggerFactory.getLogger(HiveMapBenchmarkTest.class);
+
   @Test
   public void testMethod() throws SQLException
   {
@@ -45,18 +44,24 @@ public class HiveMapBenchmarkTest
     InputStream inputStream = null;
     try {
       inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml");
-    }
-    catch (FileNotFoundException ex) {
-      LOG.debug("Exception caught {}",ex);
+    } catch (FileNotFoundException ex) {
+      LOG.debug("Exception caught {}", ex);
     }
     conf.addResource(inputStream);
-    LOG.debug("conf properties are {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
-    LOG.debug("conf dburl is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    LOG.debug("conf filepath is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
-    LOG.debug("maximum length is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength"));
-    LOG.debug("tablename is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"));
-    LOG.debug("permission is {}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission"));
-    LOG.debug("delimiter is {}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter"));
+    LOG.debug("conf properties are {}",
+        conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    LOG.debug("conf dburl is {}",
+        conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
+    LOG.debug("conf filepath is {}",
+        conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
+    LOG.debug("maximum length is {}",
+        conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength"));
+    LOG.debug("tablename is {}",
+        conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+    LOG.debug("permission is {}",
+        conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission"));
+    LOG.debug("delimiter is {}",
+        conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter"));
 
     HiveMapInsertBenchmarkingApp app = new HiveMapInsertBenchmarkingApp();
     LocalMode lm = LocalMode.newInstance();
@@ -64,14 +69,11 @@ public class HiveMapBenchmarkTest
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(30000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 
     IOUtils.closeQuietly(inputStream);
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
index e2d2f6a..6cb901a 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
@@ -43,8 +43,7 @@ public class KafkaInputBenchmarkTest
       lma.prepareDAG(new KafkaInputBenchmark(), conf);
       LocalMode.Controller lc = lma.getController();
       lc.run(30000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
index d85372f..4de7193 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
 
-
 public class KafkaOutputBenchmarkTest
 {
   @Test
@@ -44,8 +43,7 @@ public class KafkaOutputBenchmarkTest
       lma.prepareDAG(new KafkaInputBenchmark(), conf);
       LocalMode.Controller lc = lma.getController();
       lc.run(30000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
index 4c046fb..9201cd5 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
@@ -18,13 +18,16 @@
  */
 package com.datatorrent.benchmark.memsql;
 
-import com.datatorrent.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.memsql.MemsqlInputOperator;
 import com.datatorrent.lib.stream.DevNull;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * BenchMark Results
@@ -39,7 +42,7 @@ import org.slf4j.LoggerFactory;
  *
  * @since 1.0.5
  */
-@ApplicationAnnotation(name="MemsqlInputBenchmark")
+@ApplicationAnnotation(name = "MemsqlInputBenchmark")
 public class MemsqlInputBenchmark implements StreamingApplication
 {
   private static final Logger LOG = LoggerFactory.getLogger(MemsqlInputBenchmark.class);
@@ -50,13 +53,13 @@ public class MemsqlInputBenchmark implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     MemsqlInputOperator memsqlInputOperator = dag.addOperator("memsqlInputOperator",
-                                                                new MemsqlInputOperator());
+        new MemsqlInputOperator());
 
     DevNull<Object> devNull = dag.addOperator("devnull",
-                                      new DevNull<Object>());
+        new DevNull<Object>());
 
     dag.addStream("memsqlconnector",
-                  memsqlInputOperator.outputPort,
-                  devNull.data);
+        memsqlInputOperator.outputPort,
+        devNull.data);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
index 55fec7c..fa98a18 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
@@ -18,26 +18,33 @@
  */
 package com.datatorrent.benchmark.memsql;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.Operator.ProcessingMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.contrib.memsql.*;
-import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE;
-import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.*;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.sql.SQLException;
 import java.util.Random;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
+import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
+import com.datatorrent.contrib.memsql.MemsqlStore;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.netlet.util.DTThrowable;
+
+import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID;
+
 public class MemsqlInputBenchmarkTest
 {
   private static final Logger LOG = LoggerFactory.getLogger(MemsqlInputBenchmarkTest.class);
@@ -52,28 +59,33 @@ public class MemsqlInputBenchmarkTest
 
     MemsqlStore memsqlStore = new MemsqlStore();
     memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl"));
-    memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
+    memsqlStore.setConnectionProperties(
+        conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
 
     AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
 
     MemsqlPOJOOutputOperator outputOperator = new MemsqlPOJOOutputOperator();
-    outputOperator.getStore().setDatabaseUrl(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
-    outputOperator.getStore().setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
+    outputOperator.getStore().setDatabaseUrl(
+        conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
+    outputOperator.getStore().setConnectionProperties(
+        conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
     outputOperator.setBatchSize(BATCH_SIZE);
 
     Random random = new Random();
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
     attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
     attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+    OperatorContextTestHelper.TestIdOperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
 
     long seedSize = conf.getLong("dt.seedSize", SEED_SIZE);
 
     outputOperator.setup(context);
     outputOperator.beginWindow(0);
 
-    for(long valueCounter = 0;
+    for (long valueCounter = 0;
         valueCounter < seedSize;
         valueCounter++) {
       outputOperator.input.put(random.nextInt());
@@ -89,8 +101,7 @@ public class MemsqlInputBenchmarkTest
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
index 8534e20..297bc6d 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.memsql;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * BenchMark Results
@@ -41,10 +43,10 @@ import org.slf4j.LoggerFactory;
  *
  * @since 1.0.5
  */
-@ApplicationAnnotation(name="MemsqlOutputBenchmark")
+@ApplicationAnnotation(name = "MemsqlOutputBenchmark")
 public class MemsqlOutputBenchmark implements StreamingApplication
 {
-  private static transient final Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmark.class);
+  private static final transient Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmark.class);
 
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final int MAX_WINDOW_COUNT = 10000;
@@ -61,7 +63,7 @@ public class MemsqlOutputBenchmark implements StreamingApplication
     @Override
     public void emitTuples()
     {
-      if(done) {
+      if (done) {
         return;
       }
 
@@ -73,8 +75,7 @@ public class MemsqlOutputBenchmark implements StreamingApplication
     {
       try {
         super.endWindow();
-      }
-      catch(Exception e) {
+      } catch (Exception e) {
         done = true;
       }
     }
@@ -83,20 +84,21 @@ public class MemsqlOutputBenchmark implements StreamingApplication
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    CustomRandomEventGenerator randomEventGenerator = dag.addOperator("randomEventGenerator", new CustomRandomEventGenerator());
+    CustomRandomEventGenerator randomEventGenerator = dag.addOperator(
+        "randomEventGenerator", new CustomRandomEventGenerator());
     randomEventGenerator.setMaxCountOfWindows(MAX_WINDOW_COUNT);
     randomEventGenerator.setTuplesBlastIntervalMillis(TUPLE_BLAST_MILLIS);
     randomEventGenerator.setTuplesBlast(TUPLE_BLAST);
 
     LOG.debug("Before making output operator");
     MemsqlPOJOOutputOperator memsqlOutputOperator = dag.addOperator("memsqlOutputOperator",
-                                                                new MemsqlPOJOOutputOperator());
+        new MemsqlPOJOOutputOperator());
     LOG.debug("After making output operator");
 
     memsqlOutputOperator.setBatchSize(DEFAULT_BATCH_SIZE);
 
     dag.addStream("memsqlConnector",
-                  randomEventGenerator.integer_data,
-                  memsqlOutputOperator.input);
+        randomEventGenerator.integer_data,
+        memsqlOutputOperator.input);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
index bab0c9e..bf82ab3 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
@@ -18,20 +18,23 @@
  */
 package com.datatorrent.benchmark.memsql;
 
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
-import com.datatorrent.contrib.memsql.MemsqlStore;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.sql.SQLException;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
+import com.datatorrent.contrib.memsql.MemsqlStore;
+import com.datatorrent.netlet.util.DTThrowable;
+
 public class MemsqlOutputBenchmarkTest
 {
   private static final Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmarkTest.class);
@@ -45,7 +48,8 @@ public class MemsqlOutputBenchmarkTest
 
     MemsqlStore memsqlStore = new MemsqlStore();
     memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl"));
-    memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties"));
+    memsqlStore.setConnectionProperties(
+        conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties"));
 
     AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
 
@@ -56,8 +60,7 @@ public class MemsqlOutputBenchmarkTest
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
index 9f82e79..d270e7f 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
@@ -18,14 +18,17 @@
  */
 package com.datatorrent.benchmark.script;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for Ruby Operator in local mode.
  */
-public class RubyOperatorBenchmarkAppTest {
+public class RubyOperatorBenchmarkAppTest
+{
 
   @Test
   public void testApplication() throws Exception

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
index 7e64c5f..b87fec1 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
@@ -35,7 +35,6 @@ import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.datatorrent.lib.fileaccess.TFileImpl;
 
-
 public class SpillableDSBenchmarkTest
 {
   private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class);
@@ -57,7 +56,6 @@ public class SpillableDSBenchmarkTest
   @Rule
   public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
 
-
   @Before
   public void setup()
   {
@@ -116,7 +114,8 @@ public class SpillableDSBenchmarkTest
       long spentTime = System.currentTimeMillis() - startTime;
       if (spentTime > outputTimes * 5000) {
         ++outputTimes;
-        logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime);
+        logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}",
+            spentTime, i, i * 1000 / spentTime);
         checkEnvironment();
       }
     }
@@ -126,7 +125,6 @@ public class SpillableDSBenchmarkTest
         loopCount / spentTime);
   }
 
-
   public void putEntry(SpillableMapImpl<String, String> map)
   {
     map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
index 4f03a10..dc8f4b4 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -34,7 +34,6 @@ import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
 /**
  * This is not a really unit test, but in fact a benchmark runner.
  * Provides this class to give developers the convenience to run in local IDE environment.
- *
  */
 public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
 {
@@ -91,8 +90,6 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
     lc.shutdown();
   }
 
-
-
   @Override
   public String getStoreBasePath(Configuration conf)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
index 14fd7e3..99d8a1f 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventClassifier Operator in local mode.
  */
@@ -47,9 +50,8 @@ public class EventClassifierAppTest
       lm.prepareDAG(new EventClassifierApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
index b793ecd..929d8bc 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventClassifierNumberToHashDouble Operator in local mode.
  */
@@ -48,9 +51,8 @@ public class EventClassifierNumberToHashDoubleAppTest
       lm.prepareDAG(new EventClassifierNumberToHashDoubleApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
index 4808ff2..5a427a5 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventGenerator Operator in local mode.
  */
@@ -50,9 +53,8 @@ public class EventGeneratorAppTest
       lm.prepareDAG(new EventGeneratorApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
index 4384256..1a85a7b 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventIncrementerApp Operator in local mode.
  */
@@ -48,9 +51,8 @@ public class EventIncrementerAppTest
       lm.prepareDAG(new EventIncrementerApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
index e1368ba..9419022 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
@@ -19,6 +19,8 @@
 package com.datatorrent.benchmark.testbench;
 
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.InputStream;
 
 import org.junit.Test;
@@ -28,8 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+
 /**
  * Benchmark Test for FilterClassifierApp Operator in local mode.
  */
@@ -49,9 +50,8 @@ public class FilterClassifierAppTest
       lm.prepareDAG(new FilterClassifierApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
index 95453f2..977d6b7 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for FilteredEventClassifierApp Operator in local mode.
  */
@@ -47,9 +50,8 @@ public class FilteredEventClassifierAppTest
       lm.prepareDAG(new FilteredEventClassifierApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
index cc180f0..92ca0fd 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for ThroughputCounterApp Operator in local mode.
  */
@@ -47,9 +50,8 @@ public class ThroughputCounterAppTest
       lm.prepareDAG(new ThroughputCounterApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
index 98ecf67..157accc 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
@@ -40,7 +40,6 @@ public class GenericSerdePerformanceTest
   private Random random = new Random();
   private int serdeDataSize = 1000000;
 
-
   @Test
   public void testCompareSerdeForString()
   {
@@ -74,7 +73,6 @@ public class GenericSerdePerformanceTest
     buffer.release();
   }
 
-
   @Test
   public void testCompareSerdeForRealCase()
   {
@@ -88,7 +86,6 @@ public class GenericSerdePerformanceTest
     long genericSerdeCost = System.currentTimeMillis() - beginTime;
     logger.info("Generic Serde cost for ImmutablePair: {}", genericSerdeCost);
 
-
     beginTime = System.currentTimeMillis();
     Kryo kryo = new Kryo();
     for (int i = 0; i < serdeDataSize; ++i) {
@@ -99,7 +96,6 @@ public class GenericSerdePerformanceTest
     long kryoSerdeCost = System.currentTimeMillis() - beginTime;
     logger.info("Kryo Serde cost for ImmutablePair without class info: {}", kryoSerdeCost);
 
-
     beginTime = System.currentTimeMillis();
     Kryo kryo1 = new Kryo();
     for (int i = 0; i < serdeDataSize; ++i) {
@@ -113,6 +109,7 @@ public class GenericSerdePerformanceTest
 
   protected ImmutablePair generatePair(long now)
   {
-    return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), random.nextInt(100)), "" + random.nextInt(1000));
+    return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100),
+        random.nextInt(100)), "" + random.nextInt(1000));
   }
 }


[3/3] apex-malhar git commit: Merge commit 'refs/pull/539/head' of https://github.com/apache/apex-malhar

Posted by th...@apache.org.
Merge commit 'refs/pull/539/head' of https://github.com/apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e22ea0de
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e22ea0de
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e22ea0de

Branch: refs/heads/master
Commit: e22ea0de192967e2006071dd191cb25d4173cf31
Parents: 623b803 5528a4c
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jan 22 11:22:32 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Jan 22 11:22:32 2017 -0800

----------------------------------------------------------------------
 benchmark/pom.xml                               |  8 ----
 .../datatorrent/benchmark/ApplicationFixed.java | 15 +++---
 .../com/datatorrent/benchmark/Benchmark.java    | 17 +++----
 .../benchmark/CouchBaseAppInput.java            |  6 +--
 .../benchmark/CouchBaseAppOutput.java           |  8 ++--
 .../benchmark/CouchBaseInputOperator.java       | 17 ++++---
 .../benchmark/FixedTuplesInputOperator.java     | 11 +++--
 .../datatorrent/benchmark/RandomMapOutput.java  | 40 +++++++++-------
 .../benchmark/RandomWordInputModule.java        | 11 +++--
 .../benchmark/WordCountOperator.java            | 10 ++--
 .../AerospikeOutputBenchmarkApplication.java    | 17 ++++---
 .../aerospike/AerospikeOutputOperator.java      | 13 ++++--
 .../UniqueValueCountBenchmarkApplication.java   | 21 +++++----
 .../CassandraOutputBenchmarkApplication.java    | 17 ++++---
 .../cassandra/CassandraOutputOperator.java      | 14 +++---
 .../benchmark/fs/FSByteOutputOperator.java      |  9 ++--
 .../benchmark/fs/FSOutputOperatorBenchmark.java | 26 +++++++----
 .../hive/HiveInsertBenchmarkingApp.java         | 28 ++++++-----
 .../hive/HiveMapInsertBenchmarkingApp.java      | 31 +++++++------
 .../kafka/BenchmarkKafkaInputOperator.java      |  6 +--
 ...nchmarkPartitionableKafkaOutputOperator.java | 44 +++++++++---------
 .../benchmark/kafka/KafkaInputBenchmark.java    | 23 ++++-----
 .../benchmark/kafka/KafkaOutputBenchmark.java   |  5 +-
 .../benchmark/kafka/KafkaTestPartitioner.java   |  7 +--
 .../RubyOperatorBenchmarkApplication.java       | 11 +++--
 .../spillable/SpillableTestOperator.java        |  3 +-
 .../state/ManagedStateBenchmarkApp.java         |  3 +-
 .../benchmark/state/StoreOperator.java          |  9 ++--
 .../stream/DevNullCounterBenchmark.java         | 13 +++---
 .../benchmark/stream/IntegerOperator.java       | 17 ++++---
 .../benchmark/stream/StreamDuplicaterApp.java   | 21 +++++----
 .../benchmark/stream/StreamMergeApp.java        |  5 +-
 .../benchmark/testbench/EventClassifierApp.java | 11 +++--
 .../EventClassifierNumberToHashDoubleApp.java   | 18 ++++---
 .../benchmark/testbench/EventGeneratorApp.java  |  9 ++--
 .../testbench/EventIncrementerApp.java          | 22 +++++----
 .../testbench/FilterClassifierApp.java          | 15 +++---
 .../testbench/FilteredEventClassifierApp.java   |  8 ++--
 .../benchmark/testbench/HashMapOperator.java    | 37 +++++++++------
 .../testbench/RandomEventGeneratorApp.java      |  8 ++--
 .../testbench/SeedEventGeneratorApp.java        | 14 ++++--
 .../testbench/ThroughputCounterApp.java         | 13 ++++--
 .../AbstractWindowedOperatorBenchmarkApp.java   | 10 ++--
 .../KeyedWindowedOperatorBenchmarkApp.java      |  9 ++--
 .../window/WindowedOperatorBenchmarkApp.java    |  6 ++-
 .../benchmark/ApplicationFixedTest.java         | 17 ++++---
 .../datatorrent/benchmark/BenchmarkTest.java    |  3 +-
 .../benchmark/CouchBaseBenchmarkTest.java       | 13 +++---
 .../benchmark/accumulo/AccumuloApp.java         | 17 ++++---
 .../benchmark/accumulo/AccumuloAppTest.java     |  8 ++--
 .../aerospike/AerospikeBenchmarkAppTest.java    |  7 +--
 .../algo/UniqueValueCountBenchmarkTest.java     |  6 ++-
 .../cassandra/CassandraApplicatonTest.java      |  8 ++--
 .../benchmark/hbase/HBaseApplicationTest.java   |  9 ++--
 .../hbase/HBaseCsvMappingApplication.java       |  7 +--
 .../benchmark/hive/HiveInsertBenchmarkTest.java | 37 +++++++++------
 .../benchmark/hive/HiveMapBenchmarkTest.java    | 38 ++++++++-------
 .../kafka/KafkaInputBenchmarkTest.java          |  3 +-
 .../kafka/KafkaOutputBenchmarkTest.java         |  4 +-
 .../benchmark/memsql/MemsqlInputBenchmark.java  | 21 +++++----
 .../memsql/MemsqlInputBenchmarkTest.java        | 49 ++++++++++++--------
 .../benchmark/memsql/MemsqlOutputBenchmark.java | 26 ++++++-----
 .../memsql/MemsqlOutputBenchmarkTest.java       | 21 +++++----
 .../script/RubyOperatorBenchmarkAppTest.java    |  7 ++-
 .../spillable/SpillableDSBenchmarkTest.java     |  6 +--
 .../state/ManagedStateBenchmarkAppTest.java     |  3 --
 .../testbench/EventClassifierAppTest.java       | 12 +++--
 ...ventClassifierNumberToHashDoubleAppTest.java | 12 +++--
 .../testbench/EventGeneratorAppTest.java        | 12 +++--
 .../testbench/EventIncrementerAppTest.java      | 12 +++--
 .../testbench/FilterClassifierAppTest.java      | 10 ++--
 .../FilteredEventClassifierAppTest.java         | 12 +++--
 .../testbench/ThroughputCounterAppTest.java     | 12 +++--
 .../util/serde/GenericSerdePerformanceTest.java |  7 +--
 74 files changed, 608 insertions(+), 457 deletions(-)
----------------------------------------------------------------------



[2/3] apex-malhar git commit: fixing all checkstyle violations, delete maxAllowedViolations from pom

Posted by th...@apache.org.
fixing all checkstyle violations,
delete maxAllowedViolations from pom


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5528a4c6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5528a4c6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5528a4c6

Branch: refs/heads/master
Commit: 5528a4c639a87dbfaba4a2bae68ac99971c66224
Parents: 4cbbb75
Author: Apex Dev <de...@apex.apache.org>
Authored: Wed Jan 18 14:43:33 2017 -0800
Committer: Oliver W <ol...@datatorrent.com>
Committed: Fri Jan 20 11:12:20 2017 -0800

----------------------------------------------------------------------
 benchmark/pom.xml                               |  8 ----
 .../datatorrent/benchmark/ApplicationFixed.java | 15 +++---
 .../com/datatorrent/benchmark/Benchmark.java    | 17 +++----
 .../benchmark/CouchBaseAppInput.java            |  6 +--
 .../benchmark/CouchBaseAppOutput.java           |  8 ++--
 .../benchmark/CouchBaseInputOperator.java       | 17 ++++---
 .../benchmark/FixedTuplesInputOperator.java     | 11 +++--
 .../datatorrent/benchmark/RandomMapOutput.java  | 40 +++++++++-------
 .../benchmark/RandomWordInputModule.java        | 11 +++--
 .../benchmark/WordCountOperator.java            | 10 ++--
 .../AerospikeOutputBenchmarkApplication.java    | 17 ++++---
 .../aerospike/AerospikeOutputOperator.java      | 13 ++++--
 .../UniqueValueCountBenchmarkApplication.java   | 21 +++++----
 .../CassandraOutputBenchmarkApplication.java    | 17 ++++---
 .../cassandra/CassandraOutputOperator.java      | 14 +++---
 .../benchmark/fs/FSByteOutputOperator.java      |  9 ++--
 .../benchmark/fs/FSOutputOperatorBenchmark.java | 26 +++++++----
 .../hive/HiveInsertBenchmarkingApp.java         | 28 ++++++-----
 .../hive/HiveMapInsertBenchmarkingApp.java      | 31 +++++++------
 .../kafka/BenchmarkKafkaInputOperator.java      |  6 +--
 ...nchmarkPartitionableKafkaOutputOperator.java | 44 +++++++++---------
 .../benchmark/kafka/KafkaInputBenchmark.java    | 23 ++++-----
 .../benchmark/kafka/KafkaOutputBenchmark.java   |  5 +-
 .../benchmark/kafka/KafkaTestPartitioner.java   |  7 +--
 .../RubyOperatorBenchmarkApplication.java       | 11 +++--
 .../spillable/SpillableTestOperator.java        |  3 +-
 .../state/ManagedStateBenchmarkApp.java         |  3 +-
 .../benchmark/state/StoreOperator.java          |  9 ++--
 .../stream/DevNullCounterBenchmark.java         | 13 +++---
 .../benchmark/stream/IntegerOperator.java       | 17 ++++---
 .../benchmark/stream/StreamDuplicaterApp.java   | 21 +++++----
 .../benchmark/stream/StreamMergeApp.java        |  5 +-
 .../benchmark/testbench/EventClassifierApp.java | 11 +++--
 .../EventClassifierNumberToHashDoubleApp.java   | 18 ++++---
 .../benchmark/testbench/EventGeneratorApp.java  |  9 ++--
 .../testbench/EventIncrementerApp.java          | 22 +++++----
 .../testbench/FilterClassifierApp.java          | 15 +++---
 .../testbench/FilteredEventClassifierApp.java   |  8 ++--
 .../benchmark/testbench/HashMapOperator.java    | 37 +++++++++------
 .../testbench/RandomEventGeneratorApp.java      |  8 ++--
 .../testbench/SeedEventGeneratorApp.java        | 14 ++++--
 .../testbench/ThroughputCounterApp.java         | 13 ++++--
 .../AbstractWindowedOperatorBenchmarkApp.java   | 10 ++--
 .../KeyedWindowedOperatorBenchmarkApp.java      |  9 ++--
 .../window/WindowedOperatorBenchmarkApp.java    |  6 ++-
 .../benchmark/ApplicationFixedTest.java         | 17 ++++---
 .../datatorrent/benchmark/BenchmarkTest.java    |  3 +-
 .../benchmark/CouchBaseBenchmarkTest.java       | 13 +++---
 .../benchmark/accumulo/AccumuloApp.java         | 17 ++++---
 .../benchmark/accumulo/AccumuloAppTest.java     |  8 ++--
 .../aerospike/AerospikeBenchmarkAppTest.java    |  7 +--
 .../algo/UniqueValueCountBenchmarkTest.java     |  6 ++-
 .../cassandra/CassandraApplicatonTest.java      |  8 ++--
 .../benchmark/hbase/HBaseApplicationTest.java   |  9 ++--
 .../hbase/HBaseCsvMappingApplication.java       |  7 +--
 .../benchmark/hive/HiveInsertBenchmarkTest.java | 37 +++++++++------
 .../benchmark/hive/HiveMapBenchmarkTest.java    | 38 ++++++++-------
 .../kafka/KafkaInputBenchmarkTest.java          |  3 +-
 .../kafka/KafkaOutputBenchmarkTest.java         |  4 +-
 .../benchmark/memsql/MemsqlInputBenchmark.java  | 21 +++++----
 .../memsql/MemsqlInputBenchmarkTest.java        | 49 ++++++++++++--------
 .../benchmark/memsql/MemsqlOutputBenchmark.java | 26 ++++++-----
 .../memsql/MemsqlOutputBenchmarkTest.java       | 21 +++++----
 .../script/RubyOperatorBenchmarkAppTest.java    |  7 ++-
 .../spillable/SpillableDSBenchmarkTest.java     |  6 +--
 .../state/ManagedStateBenchmarkAppTest.java     |  3 --
 .../testbench/EventClassifierAppTest.java       | 12 +++--
 ...ventClassifierNumberToHashDoubleAppTest.java | 12 +++--
 .../testbench/EventGeneratorAppTest.java        | 12 +++--
 .../testbench/EventIncrementerAppTest.java      | 12 +++--
 .../testbench/FilterClassifierAppTest.java      | 10 ++--
 .../FilteredEventClassifierAppTest.java         | 12 +++--
 .../testbench/ThroughputCounterAppTest.java     | 12 +++--
 .../util/serde/GenericSerdePerformanceTest.java |  7 +--
 74 files changed, 608 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index b2e0981..4bbd5ac 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -143,14 +143,6 @@
            <skip>true</skip>
          </configuration>
        </plugin>
-       <plugin>
-         <groupId>org.apache.maven.plugins</groupId>
-         <artifactId>maven-checkstyle-plugin</artifactId>
-         <configuration>
-           <maxAllowedViolations>281</maxAllowedViolations>
-           <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
-         </configuration>
-       </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
index 53f01fc..aa10eea 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
@@ -18,13 +18,15 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
 
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 /**
  * Example of application configuration in Java.
@@ -34,7 +36,7 @@ import org.apache.hadoop.conf.Configuration;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="PerformanceBenchmarkForFixedNumberOfTuples")
+@ApplicationAnnotation(name = "PerformanceBenchmarkForFixedNumberOfTuples")
 public class ApplicationFixed implements StreamingApplication
 {
   private final Locality locality = null;
@@ -44,7 +46,8 @@ public class ApplicationFixed implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     FixedTuplesInputOperator wordGenerator = dag.addOperator("WordGenerator", FixedTuplesInputOperator.class);
-    dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     wordGenerator.setCount(500000);
 
     WordCountOperator<byte[]> counter = dag.addOperator("Counter", new WordCountOperator<byte[]>());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
index 5649914..d8d51b8 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
@@ -52,10 +52,10 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  * @since 0.9.0
  */
 
-@ApplicationAnnotation(name="PerformanceBenchmarkingApp")
+@ApplicationAnnotation(name = "PerformanceBenchmarkingApp")
 public abstract class Benchmark
 {
-  static abstract class AbstractApplication implements StreamingApplication
+  abstract static class AbstractApplication implements StreamingApplication
   {
     public static final int QUEUE_CAPACITY = 32 * 1024;
 
@@ -63,7 +63,8 @@ public abstract class Benchmark
     public void populateDAG(DAG dag, Configuration conf)
     {
       RandomWordInputModule wordGenerator = dag.addOperator("wordGenerator", RandomWordInputModule.class);
-      dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+      dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+          .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
 
       WordCountOperator<byte[]> counter = dag.addOperator("counter", new WordCountOperator<byte[]>());
       dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
@@ -77,7 +78,7 @@ public abstract class Benchmark
   /**
    * Let the engine decide how to best place the 2 operators.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkNoLocality")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkNoLocality")
   public static class NoLocality extends AbstractApplication
   {
     @Override
@@ -92,7 +93,7 @@ public abstract class Benchmark
    * Place the 2 operators so that they are in the same Rack.
    * The operators are requested to be deployed on different machines.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkRackLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkRackLocal")
   public static class RackLocal extends AbstractApplication
   {
     @Override
@@ -107,7 +108,7 @@ public abstract class Benchmark
    * Place the 2 operators so that they are in the same node.
    * The operators are requested to be deployed as different processes within the same machine.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkNodeLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkNodeLocal")
   public static class NodeLocal extends AbstractApplication
   {
     @Override
@@ -122,7 +123,7 @@ public abstract class Benchmark
    * Place the 2 operators so that they are in the same container.
    * The operators are deployed as different threads in the same process.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkContainerLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkContainerLocal")
   public static class ContainerLocal extends AbstractApplication
   {
     @Override
@@ -136,7 +137,7 @@ public abstract class Benchmark
   /**
    * Place the 2 operators so that they are in the same thread.
    */
-  @ApplicationAnnotation(name="PerformanceBenchmarkThreadLocal")
+  @ApplicationAnnotation(name = "PerformanceBenchmarkThreadLocal")
   public static class ThreadLocal extends AbstractApplication
   {
     @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
index 6096530..bf5b876 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
 import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 /**
- *
  * Application to benchmark the performance of couchbase input operator.
  * The number of tuples processed per second were around 9000.
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
index f789d08..4f12791 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
@@ -18,12 +18,14 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
 import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
index 923b588..8ae0a94 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
@@ -18,12 +18,14 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
-import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
 import java.util.ArrayList;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
+import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
+
 /**
  * <p>CouchBaseInputOperator class.</p>
  *
@@ -32,14 +34,15 @@ import org.slf4j.LoggerFactory;
 public class CouchBaseInputOperator extends AbstractCouchBaseInputOperator<String>
 {
   private static final Logger logger = LoggerFactory.getLogger(CouchBaseWindowStore.class);
+
   @Override
   public String getTuple(Object object)
   {
-    if(object!=null)
-    return object.toString();
-    else{
-     logger.info("Object returned is null");
-     return "null";
+    if (object != null) {
+      return object.toString();
+    } else {
+      logger.info("Object returned is null");
+      return "null";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
index ad7f8c1..f2582bd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
@@ -18,14 +18,15 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
+import java.util.ArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
 
 /**
  * <p>FixedTuplesInputOperator class.</p>
@@ -44,7 +45,7 @@ public class FixedTuplesInputOperator implements InputOperator
   {
     if (firstTime) {
       long start = System.currentTimeMillis();
-      for (int i = count; i-- > 0;) {
+      for (int i = count; i-- > 0; ) {
         output.emit(new byte[64]);
       }
       firstTime = false;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
index 106bd79..3342771 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
@@ -19,20 +19,23 @@
 package com.datatorrent.benchmark;
 
 import java.util.HashMap;
-import com.datatorrent.common.util.BaseOperator;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Operator that outputs random values in a map.
  *
  * @since 1.0.4
  */
-public class RandomMapOutput extends BaseOperator {
+public class RandomMapOutput extends BaseOperator
+{
 
-  public final transient DefaultOutputPort<HashMap<String, Object>> map_data = new DefaultOutputPort<HashMap<String, Object>>();
+  public final transient DefaultOutputPort<HashMap<String, Object>> map_data =
+      new DefaultOutputPort<HashMap<String, Object>>();
   public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
-      {
+  {
     @Override
     public void process(Integer tuple)
     {
@@ -40,22 +43,25 @@ public class RandomMapOutput extends BaseOperator {
       map.put(key, tuple);
       RandomMapOutput.this.process(map);
     }
-      };
+  };
 
-      private String key;
+  private String key;
 
-      public String getKey() {
-        return key;
-      }
+  public String getKey()
+  {
+    return key;
+  }
 
-      public void setKey(String key) {
-        this.key = key;
-      }
+  public void setKey(String key)
+  {
+    this.key = key;
+  }
 
-      public void process(HashMap<String, Object> tuple) {
+  public void process(HashMap<String, Object> tuple)
+  {
 
-        if (map_data.isConnected()) {
-          map_data.emit(tuple);
-        }
-      }
+    if (map_data.isConnected()) {
+      map_data.emit(tuple);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
index 11c7568..7d02de2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
@@ -18,10 +18,12 @@
  */
 package com.datatorrent.benchmark;
 
+import javax.validation.constraints.Min;
+
+import com.datatorrent.api.Context.OperatorContext;
+
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
-import javax.validation.constraints.Min;
 
 /**
  * <p>
@@ -87,7 +89,6 @@ public class RandomWordInputModule implements InputOperator
     return emitSameTuple;
   }
 
-
   /**
    * Emits byte array of specified size.
    * Emits either the same byte array or creates new byte array every time
@@ -103,11 +104,11 @@ public class RandomWordInputModule implements InputOperator
     final boolean EMIT_SAME_TUPLE_COPY = emitSameTuple;
     if (firstTime) {
       if (EMIT_SAME_TUPLE_COPY) {
-        for (int i = count--; i-- > 0;) {
+        for (int i = count--; i-- > 0; ) {
           output.emit(sameTupleArray);
         }
       } else {
-        for (int i = count--; i-- > 0;) {
+        for (int i = count--; i-- > 0; ) {
           output.emit(new byte[TUPLE_SIZE_COPY]);
         }
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
index 098ed42..6e91482 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
@@ -21,14 +21,17 @@ package com.datatorrent.benchmark;
 /*
  * To change this template, choose Tools | Templates and open the template in the editor.
  */
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Context.OperatorContext;
 
 import java.util.ArrayList;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
 /**
  * <p>WordCountOperator class.</p>
  *
@@ -84,5 +87,6 @@ public class WordCountOperator<T> implements Operator
     counts = new ArrayList<Integer>();
     millis = new ArrayList<Integer>();
   }
+
   private static final Logger logger = LoggerFactory.getLogger(WordCountOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
index a70aae6..0a880fd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
@@ -18,15 +18,16 @@
  */
 package com.datatorrent.benchmark.aerospike;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.aerospike.AerospikeTransactionalStore;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
+
 /**
- *
  * Application to benchmark the performance of aerospike output operator.
  * The operator was tested on DT cluster and the number of tuples processed
  * by the operator per second were around 12,000
@@ -34,16 +35,18 @@ import org.apache.hadoop.conf.Configuration;
  * @since 1.0.4
  */
 
-
-@ApplicationAnnotation(name="AerospikeOutputOperatorBenchmark")
-public class AerospikeOutputBenchmarkApplication implements StreamingApplication {
+@ApplicationAnnotation(name = "AerospikeOutputOperatorBenchmark")
+public class AerospikeOutputBenchmarkApplication implements StreamingApplication
+{
 
   private final String NODE = "127.0.0.1";
   private final int PORT = 3000;
   private final String NAMESPACE = "test";
   private final Locality locality = null;
+
   @Override
-  public void populateDAG(DAG dag, Configuration conf) {
+  public void populateDAG(DAG dag, Configuration conf)
+  {
 
     RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
     rand.setMaxvalue(3000);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
index 210e086..f9ee689 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
@@ -23,25 +23,28 @@ import java.util.List;
 import com.aerospike.client.AerospikeException;
 import com.aerospike.client.Bin;
 import com.aerospike.client.Key;
-import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
 
+import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
 
 /**
  * <p>AerospikeOutputOperator class.</p>
  *
  * @since 1.0.4
  */
-public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>{
+public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>
+{
 
   private final String KEYSPACE = "test";
   private final String SET_NAME = "Aerospike_Output";
   private int id = 0;
+
   @Override
   protected Key getUpdatedBins(Integer tuple, List<Bin> bins)
-      throws AerospikeException {
+    throws AerospikeException
+  {
 
-    Key key = new Key(KEYSPACE,SET_NAME,id++);
-    bins.add(new Bin("ID",tuple));
+    Key key = new Key(KEYSPACE, SET_NAME, id++);
+    bins.add(new Bin("ID", tuple));
     return key;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
index f522396..f74311e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
@@ -18,23 +18,22 @@
  */
 package com.datatorrent.benchmark.algo;
 
-
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
 
 import com.datatorrent.lib.stream.Counter;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
 
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 /**
  * Application to demonstrate PartitionableUniqueCount operator. <br>
  * The input operator generate random keys, which is sent to
@@ -63,9 +62,11 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio
 
     /* Initialize with three partition to start with */
     UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>());
-    MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
+    MapToKeyHashValuePairConverter<Integer, Integer> converter =
+        dag.addOperator("converter", new MapToKeyHashValuePairConverter());
 
-    dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3));
+    dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER,
+        new StatelessPartitioner<UniqueCounter<Integer>>(3));
     dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true);
     uniqCount.setCumulative(false);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
index dead2cd..46d503f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
@@ -19,24 +19,27 @@
 package com.datatorrent.benchmark.cassandra;
 
 import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
+
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+
 import com.datatorrent.contrib.cassandra.CassandraTransactionalStore;
 
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
 /**
- *
- *Application to benchmark the performance of cassandra output operator.
- *The operator was tested on following configuration:
- *Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
- *The number of tuples processed per second were around 20,000
+ * Application to benchmark the performance of cassandra output operator.
+ * The operator was tested on following configuration:
+ * Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
+ * The number of tuples processed per second were around 20,000
  *
  * @since 1.0.3
  */
 
-@ApplicationAnnotation(name="CassandraOperatorDemo")
+@ApplicationAnnotation(name = "CassandraOperatorDemo")
 public class CassandraOutputBenchmarkApplication implements StreamingApplication
 {
   private final Locality locality = null;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
index 666746b..592d8a2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
@@ -18,34 +18,36 @@
  */
 package com.datatorrent.benchmark.cassandra;
 
-
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
 
+import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
 
 /**
  * <p>CassandraOutputOperator class.</p>
  *
  * @since 1.0.3
  */
-public class CassandraOutputOperator extends  AbstractCassandraTransactionableOutputOperator<Integer>{
+public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>
+{
 
   private int id = 0;
 
   @Override
-  protected PreparedStatement getUpdateCommand() {
+  protected PreparedStatement getUpdateCommand()
+  {
     String statement = "Insert into test.cassandra_operator(id, result) values (?,?);";
     return store.getSession().prepare(statement);
   }
 
   @Override
   protected Statement setStatementParameters(PreparedStatement updateCommand,
-      Integer tuple) throws DriverException {
+      Integer tuple) throws DriverException
+  {
     BoundStatement boundStmnt = new BoundStatement(updateCommand);
-    return boundStmnt.bind(id++,tuple);
+    return boundStmnt.bind(id++, tuple);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
index 894cb75..ce0821c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
@@ -18,10 +18,12 @@
  */
 package com.datatorrent.benchmark.fs;
 
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 import java.util.Arrays;
+
 import javax.validation.constraints.Min;
 
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
 /**
  * This output operator receives
  *
@@ -38,19 +40,20 @@ public class FSByteOutputOperator extends AbstractFileOutputOperator<byte[]>
   /**
    * The file a tuple is written out to is determined by modding the hashcode of the
    * tuple by the outputFileCount.
+   *
    * @param tuple The input tuple to write out.
    * @return The name of the file to write the tuple to.
    */
   @Override
   protected String getFileName(byte[] tuple)
   {
-    return ((Integer) (Arrays.hashCode(tuple) % outputFileCount)).toString();
+    return ((Integer)(Arrays.hashCode(tuple) % outputFileCount)).toString();
   }
 
   @Override
   protected byte[] getBytesForTuple(byte[] tuple)
   {
-    for(int counter = 0;
+    for (int counter = 0;
         counter < tuple.length;
         counter++) {
       tuple[counter] += 1;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
index 8702ab8..7a63d18 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
@@ -18,17 +18,20 @@
  */
 package com.datatorrent.benchmark.fs;
 
-import com.datatorrent.lib.testbench.RandomWordGenerator;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG;
+
+import com.datatorrent.api.StreamingApplication;
+
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.counters.BasicCounters;
-import org.apache.commons.lang.mutable.MutableLong;
 
+import com.datatorrent.lib.counters.BasicCounters;
 
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.testbench.RandomWordGenerator;
 
 /**
  * Application used to benchmark HDFS output operator
@@ -38,25 +41,28 @@ import org.apache.hadoop.conf.Configuration;
  * @since 0.9.4
  */
 
-@ApplicationAnnotation(name="HDFSOutputOperatorBenchmarkingApp")
+@ApplicationAnnotation(name = "HDFSOutputOperatorBenchmarkingApp")
 public class FSOutputOperatorBenchmark implements StreamingApplication
 {
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     String filePath = "HDFSOutputOperatorBenchmarkingApp/"
-            + System.currentTimeMillis();
+        + System.currentTimeMillis();
 
     dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
 
     RandomWordGenerator wordGenerator = dag.addOperator("wordGenerator", RandomWordGenerator.class);
 
-    dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
-    dag.getOperatorMeta("wordGenerator").getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
+    dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output)
+        .getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
+    dag.getOperatorMeta("wordGenerator").getAttributes()
+        .put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
 
     FSByteOutputOperator hdfsOutputOperator = dag.addOperator("hdfsOutputOperator", new FSByteOutputOperator());
     hdfsOutputOperator.setFilePath(filePath);
-    dag.getOperatorMeta("hdfsOutputOperator").getAttributes().put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
+    dag.getOperatorMeta("hdfsOutputOperator").getAttributes()
+        .put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
 
     dag.addStream("Generator2HDFSOutput", wordGenerator.output, hdfsOutputOperator.input);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
index 60be57d..95fa961 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
@@ -30,16 +30,18 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
-import com.datatorrent.contrib.hive.HiveOperator;
-import com.datatorrent.contrib.hive.HiveStore;
-
 import com.datatorrent.api.Context.OperatorContext;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
+
 import com.datatorrent.api.StreamingApplication;
+
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
 
 /**
  * Application used to benchmark HIVE Insert operator
@@ -79,13 +81,14 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
   {
     HiveStore store = new HiveStore();
     store.setDatabaseUrl(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    store.setConnectionProperties(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    store.setConnectionProperties(conf.get(
+        "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
     store.setFilepath(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
 
     try {
-      hiveInitializeDatabase(store, conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
-    }
-    catch (SQLException ex) {
+      hiveInitializeDatabase(store, conf.get(
+          "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+    } catch (SQLException ex) {
       LOG.debug(ex.getMessage());
     }
 
@@ -109,8 +112,9 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
   {
     hiveStore.connect();
     Statement stmt = hiveStore.getConnection().createStatement();
-    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
-            + "STORED AS TEXTFILE ");
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+        + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
+        + "STORED AS TEXTFILE ");
     hiveStore.disconnect();
   }
 
@@ -171,8 +175,8 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
     {
 
       for (;
-              tupleCounter < tuplesPerWindow;
-              tupleCounter++) {
+          tupleCounter < tuplesPerWindow;
+          tupleCounter++) {
         String output = "2014-12-1" + random.nextInt(10) + "";
         outputString.emit(output);
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
index cfbbfc5..98d9ce3 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
@@ -24,23 +24,23 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
-import com.datatorrent.contrib.hive.*;
-
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 import com.datatorrent.benchmark.RandomMapOutput;
 
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+
 /**
  * Application used to benchmark HIVE Map Insert operator
  * The DAG consists of random Event generator operator that is
@@ -61,12 +61,13 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
   {
     HiveStore store = new HiveStore();
     store.setDatabaseUrl(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    store.setConnectionProperties(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    store.setConnectionProperties(conf.get(
+        "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
     store.setFilepath(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
     try {
-      hiveInitializeMapDatabase(store, conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
-    }
-    catch (SQLException ex) {
+      hiveInitializeMapDatabase(store, conf.get(
+          "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
+    } catch (SQLException ex) {
       LOG.debug(ex.getMessage());
     }
     dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
@@ -90,13 +91,15 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
   /*
    * User can create table and specify data columns and partition columns in this function.
    */
-  public static void hiveInitializeMapDatabase(HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
+  public static void hiveInitializeMapDatabase(
+      HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
   {
     hiveStore.connect();
     Statement stmt = hiveStore.getConnection().createStatement();
-    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
-            + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
-            + "STORED AS TEXTFILE ");
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+        + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
+        + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
+        + "STORED AS TEXTFILE ");
     hiveStore.disconnect();
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
index 8239ea7..e147ad7 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
@@ -18,11 +18,11 @@
  */
 package com.datatorrent.benchmark.kafka;
 
-import kafka.message.Message;
-
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator;
 
+import kafka.message.Message;
+
 /**
  * This operator emits one constant message for each kafka message received.&nbsp;
  * So we can track the throughput by messages emitted per second in the stram platform.
@@ -38,7 +38,7 @@ public class BenchmarkKafkaInputOperator extends AbstractKafkaInputOperator
   /**
    * The output port on which messages are emitted.
    */
-  public transient DefaultOutputPort<String>  oport = new DefaultOutputPort<String>();
+  public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>();
 
   @Override
   protected void emitTuple(Message message)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
index 1126ac1..6353c37 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.benchmark.kafka;
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
@@ -26,9 +25,6 @@ import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 
 import javax.validation.constraints.Min;
 
@@ -37,21 +33,27 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultPartition;
+
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.Partitioner;
 
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
 /**
  * This operator keep sending constant messages(1kb each) in {@link #threadNum} threads.&nbsp;
  * Messages are distributed evenly to partitions.
  * <p></p>
+ *
  * @displayName Benchmark Partitionable Kafka Output
  * @category Messaging
  * @tags output operator
- *
  * @since 0.9.3
  */
-public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
+public class BenchmarkPartitionableKafkaOutputOperator implements
+    Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
 {
 
   private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class);
@@ -78,7 +80,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
 
   private int stickyKey = 0;
 
-  private transient Runnable r = new Runnable() {
+  private transient Runnable r = new Runnable()
+  {
 
     Producer<String, String> producer = null;
 
@@ -101,12 +104,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
       }
       long k = 0;
 
-      while (k<msgsSecThread || !controlThroughput) {
+      while (k < msgsSecThread || !controlThroughput) {
         long key = (stickyKey >= 0 ? stickyKey : k);
         k++;
         producer.send(new KeyedMessage<String, String>(topic, "" + key, new String(constantMsg)));
-        if(k==Long.MAX_VALUE){
-          k=0;
+        if (k == Long.MAX_VALUE) {
+          k = 0;
         }
       }
     }
@@ -152,10 +155,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
    * {@inheritDoc}
    */
   @Override
-  public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
+  public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(
+      Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
   {
 
-    ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
+    ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions =
+        new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
 
     for (int i = 0; i < partitionCount; i++) {
       BenchmarkPartitionableKafkaOutputOperator bpkoo = new BenchmarkPartitionableKafkaOutputOperator();
@@ -163,7 +168,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
       bpkoo.setTopic(topic);
       bpkoo.setBrokerList(brokerList);
       bpkoo.setStickyKey(i);
-      Partition<BenchmarkPartitionableKafkaOutputOperator> p = new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
+      Partition<BenchmarkPartitionableKafkaOutputOperator> p =
+          new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
       newPartitions.add(p);
     }
     return newPartitions;
@@ -176,20 +182,17 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
     logger.info("Activate the benchmark kafka output operator .... ");
     constantMsg = new byte[msgSize];
     for (int i = 0; i < constantMsg.length; i++) {
-      constantMsg[i] = (byte) ('a' + i%26);
+      constantMsg[i] = (byte)('a' + i % 26);
     }
 
-
     for (int i = 0; i < threadNum; i++) {
-      if(controlThroughput){
+      if (controlThroughput) {
         ses.scheduleAtFixedRate(r, 0, 1, TimeUnit.SECONDS);
-      }
-      else {
+      } else {
         ses.submit(r);
       }
     }
 
-
   }
 
   @Override
@@ -268,7 +271,4 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
     this.stickyKey = stickyKey;
   }
 
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
index 159ee60..ead6c66 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
@@ -18,24 +18,27 @@
  */
 package com.datatorrent.benchmark.kafka;
 
-import com.google.common.collect.Sets;
 import java.util.Properties;
 
+
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
+
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
 import com.datatorrent.contrib.kafka.KafkaConsumer;
 import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
 
-
 /**
  * The stream app to test the benckmark of kafka
  * You can set the property file to make it using either {@link SimpleKafkaConsumer} or {@link HighlevelKafkaConsumer}
@@ -43,13 +46,14 @@ import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
  *
  * @since 0.9.3
  */
-@ApplicationAnnotation(name="KafkaInputBenchmark")
+@ApplicationAnnotation(name = "KafkaInputBenchmark")
 public class KafkaInputBenchmark implements StreamingApplication
 {
 
   public static class CollectorModule extends BaseOperator
   {
-    public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() {
+    public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>()
+    {
 
       @Override
       public void process(String arg0)
@@ -65,12 +69,10 @@ public class KafkaInputBenchmark implements StreamingApplication
     dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo");
     BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator();
 
-
     String type = conf.get("kafka.consumertype", "simple");
 
     KafkaConsumer consumer = null;
 
-
     if (type.equals("highlevel")) {
       // Create template high-level consumer
 
@@ -96,7 +98,6 @@ public class KafkaInputBenchmark implements StreamingApplication
     dag.setAttribute(bpkio, OperatorContext.COUNTERS_AGGREGATOR, new KafkaConsumer.KafkaMeterStatsAggregator());
 //    dag.setAttribute(bpkio, OperatorContext.STATS_LISTENER, KafkaMeterStatsListener.class);
 
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
index ca1de48..0dd4352 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * @since 0.9.3
  */
-@ApplicationAnnotation(name="KafkaOutputBenchmark")
+@ApplicationAnnotation(name = "KafkaOutputBenchmark")
 public class KafkaOutputBenchmark implements StreamingApplication
 {
 
@@ -37,7 +37,8 @@ public class KafkaOutputBenchmark implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     dag.setAttribute(DAG.APPLICATION_NAME, "KafkaOutputBenchmark");
-    BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator("KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
+    BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator(
+        "KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
     bpkoo.setBrokerList(conf.get("kafka.brokerlist"));
     bpkoo.setPartitionCount(2);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
index 1d22613..65601d5 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
@@ -21,7 +21,6 @@ package com.datatorrent.benchmark.kafka;
 import kafka.producer.Partitioner;
 import kafka.utils.VerifiableProperties;
 
-
 /**
  * A simple partitioner class for test purpose
  * Key is a int string
@@ -32,12 +31,14 @@ import kafka.utils.VerifiableProperties;
  */
 public class KafkaTestPartitioner implements Partitioner
 {
-  public KafkaTestPartitioner (VerifiableProperties props) {
+  public KafkaTestPartitioner(VerifiableProperties props)
+  {
 
   }
+
   @Override
   public int partition(Object key, int num_Partitions)
   {
-    return Integer.parseInt((String)key)%num_Partitions;
+    return Integer.parseInt((String)key) % num_Partitions;
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
index bc23404..b86cd01 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
@@ -18,17 +18,20 @@
  */
 package com.datatorrent.benchmark.script;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+
 import com.datatorrent.benchmark.RandomMapOutput;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.contrib.ruby.RubyOperator;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
 
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
index 07ab02e..7c45106 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -169,7 +169,8 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
     long countInPeriod = totalCount - lastTotalCount;
     long timeInPeriod = System.currentTimeMillis() - lastLogTime;
     long totalTime = System.currentTimeMillis() - beginTime;
-    logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
+    logger.info(
+        "Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
         totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index ae5ba40..2dc6f0d 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -98,7 +98,8 @@ public class ManagedStateBenchmarkApp implements StreamingApplication
 
   public static class TestGenerator extends BaseOperator implements InputOperator
   {
-    public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
+    public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data =
+        new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
     int emitBatchSize = 1000;
     byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
     int rate = 20000;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 74ba658..60a775c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -69,7 +69,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   private ExecMode execMode = ExecMode.INSERT;
   private int timeRange = 1000 * 60;
 
-  public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
+  public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input =
+      new DefaultInputPort<KeyValPair<byte[], byte[]>>()
   {
     @Override
     public void process(KeyValPair<byte[], byte[]> tuple)
@@ -172,7 +173,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   private final int taskBarrier = 100000;
 
   /**
-   * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value.
+   * This method first send request of get to the state manager,
+   * then handle all the task(get) which already done and update the value.
    * @param tuple
    */
   private void updateAsync(KeyValPair<byte[], byte[]> tuple)
@@ -251,7 +253,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
     long spentTime = now - statisticsBeginTime;
     long totalSpentTime = now - applicationBeginTime;
     totalTupleCount += tupleCount;
-    logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
+    logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}",
+        windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
         totalTupleCount * 1000 / totalSpentTime);
 
     statisticsBeginTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
index e0ee160..b0b7314 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.benchmark.stream;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNullCounter;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  *
@@ -56,11 +57,11 @@ public class DevNullCounterBenchmark implements StreamingApplication
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-   // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
-   // rand.setMinvalue(0);
-   // rand.setMaxvalue(999999);
-   // rand.setTuplesBlastIntervalMillis(50);
-   // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    // rand.setMinvalue(0);
+    // rand.setMaxvalue(999999);
+    // rand.setTuplesBlastIntervalMillis(50);
+    // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
     DevNullCounter oper = dag.addOperator("oper", new DevNullCounter());
     dag.getMeta(oper).getMeta(oper.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
index ff6ed76..c716206 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
@@ -35,37 +35,42 @@ public class IntegerOperator implements InputOperator
    * Output port which emits integer.
    */
   public final transient DefaultOutputPort<Integer> integer_data = new DefaultOutputPort<Integer>();
+
   @Override
   public void emitTuples()
   {
     Integer i = 21;
-    for(int j=0;j<1000;j++){
-    integer_data.emit(i);
+    for (int j = 0; j < 1000; j++) {
+      integer_data.emit(i);
     }
   }
 
   @Override
   public void beginWindow(long windowId)
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void endWindow()
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void setup(OperatorContext context)
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void teardown()
   {
-    //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+    //throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
index 951b44b..2e5bcf9 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.benchmark.stream;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.stream.StreamDuplicater;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for StreamDuplicater Operator.
@@ -36,25 +37,25 @@ import org.apache.hadoop.conf.Configuration;
 @ApplicationAnnotation(name = "StreamDuplicaterApp")
 public class StreamDuplicaterApp implements StreamingApplication
 {
-   private final Locality locality = null;
-   public static final int QUEUE_CAPACITY = 16 * 1024;
+  private final Locality locality = null;
+  public static final int QUEUE_CAPACITY = 16 * 1024;
 
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-   // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
-   // rand.setMinvalue(0);
-   // rand.setMaxvalue(999999);
-   // rand.setTuplesBlastIntervalMillis(50);
-   // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    // rand.setMinvalue(0);
+    // rand.setMaxvalue(999999);
+    // rand.setTuplesBlastIntervalMillis(50);
+    // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
     StreamDuplicater stream = dag.addOperator("stream", new StreamDuplicater());
     dag.getMeta(stream).getMeta(stream.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("streamdup1", intInput.integer_data, stream.data).setLocality(locality);
     DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull());
     DevNull<Integer> dev2 = dag.addOperator("dev2", new DevNull());
-    dag.addStream("streamdup2",stream.out1,dev1.data).setLocality(locality);
-    dag.addStream("streamdup3",stream.out2,dev2.data).setLocality(locality);
+    dag.addStream("streamdup2", stream.out1, dev1.data).setLocality(locality);
+    dag.addStream("streamdup3", stream.out2, dev2.data).setLocality(locality);
 
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
index d90320c..bb1d081 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.benchmark.stream;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.benchmark.WordCountOperator;
 import com.datatorrent.lib.stream.StreamMerger;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for StreamMerge Operator.
@@ -46,7 +47,7 @@ public class StreamMergeApp implements StreamingApplication
     StreamMerger stream = dag.addOperator("stream", new StreamMerger());
     dag.getMeta(stream).getMeta(stream.data1).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.getMeta(stream).getMeta(stream.data2).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
-    dag.addStream("streammerge1", intInput.integer_data, stream.data1,stream.data2).setLocality(locality);
+    dag.addStream("streammerge1", intInput.integer_data, stream.data1, stream.data2).setLocality(locality);
 
     WordCountOperator<Integer> counter = dag.addOperator("counter", new WordCountOperator<Integer>());
     dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
index 419de18..b1ddbee 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
@@ -18,6 +18,11 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,9 +30,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.EventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventClassifier Operator.
@@ -75,7 +77,8 @@ public class EventClassifierApp implements StreamingApplication
     eventClassifier.setKeyMap(keymap);
     eventClassifier.setOperationReplace();
     eventClassifier.setKeyWeights(wmap);
-    dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("eventtest1", hmapOper.hmap_data, eventClassifier.event).setLocality(locality);
     DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
     dag.addStream("eventtest2", eventClassifier.data, dev.data).setLocality(locality);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
index a49b30e..5fe478b 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
@@ -18,6 +18,10 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -26,8 +30,6 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.benchmark.WordCountOperator;
 import com.datatorrent.benchmark.stream.IntegerOperator;
 import com.datatorrent.lib.testbench.EventClassifierNumberToHashDouble;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventClassifierNumberToHashDouble Operator.
@@ -44,10 +46,14 @@ public class EventClassifierNumberToHashDoubleApp implements StreamingApplicatio
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    WordCountOperator<HashMap<String, Double>> counterString = dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
-    dag.getMeta(counterString).getMeta(counterString.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
-    EventClassifierNumberToHashDouble eventClassify = dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
-    dag.getMeta(eventClassify).getMeta(eventClassify.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    WordCountOperator<HashMap<String, Double>> counterString =
+        dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
+    dag.getMeta(counterString).getMeta(counterString.input).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    EventClassifierNumberToHashDouble eventClassify =
+        dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
+    dag.getMeta(eventClassify).getMeta(eventClassify.data)
+        .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
     dag.addStream("eventclassifier2", intInput.integer_data, eventClassify.event).setLocality(locality);
     dag.addStream("eventclassifier1", eventClassify.data, counterString.input).setLocality(locality);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
index 3025c7e..8f28ae6 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
@@ -18,6 +18,10 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,8 +29,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.EventGenerator;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventGenerator Operator.
@@ -44,7 +46,8 @@ public class EventGeneratorApp implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     EventGenerator eventGenerator = dag.addOperator("eventGenerator", new EventGenerator());
-    dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+    dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
 
     DevNull<String> devString = dag.addOperator("devString", new DevNull());
     DevNull<HashMap<String, Double>> devMap = dag.addOperator("devMap", new DevNull());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
index ea05d07..e562224 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.EventIncrementer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for EventIncrementer Operator.
@@ -39,6 +41,7 @@ public class EventIncrementerApp implements StreamingApplication
 {
   private final Locality locality = null;
   public static final int QUEUE_CAPACITY = 16 * 1024;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
@@ -55,15 +58,14 @@ public class EventIncrementerApp implements StreamingApplication
     eventInc.setKeylimits(keys, low, high);
     eventInc.setDelta(1);
     HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
-    dag.addStream("eventIncInput1",hmapOper.hmapList_data,eventInc.seed);
-    dag.addStream("eventIncInput2",hmapOper.hmapMap_data,eventInc.increment);
-    DevNull<HashMap<String,Integer>> dev1= dag.addOperator("dev1", new DevNull());
-    DevNull<HashMap<String,String>> dev2= dag.addOperator("dev2", new DevNull());
-    dag.addStream("eventIncOutput1",eventInc.count,dev1.data).setLocality(locality);
-    dag.addStream("eventIncOutput2",eventInc.data,dev2.data).setLocality(locality);
+    dag.addStream("eventIncInput1", hmapOper.hmapList_data, eventInc.seed);
+    dag.addStream("eventIncInput2", hmapOper.hmapMap_data, eventInc.increment);
+    DevNull<HashMap<String, Integer>> dev1 = dag.addOperator("dev1", new DevNull());
+    DevNull<HashMap<String, String>> dev2 = dag.addOperator("dev2", new DevNull());
+    dag.addStream("eventIncOutput1", eventInc.count, dev1.data).setLocality(locality);
+    dag.addStream("eventIncOutput2", eventInc.data, dev2.data).setLocality(locality);
 
   }
 
-
 }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
index 915e6f0..ea2943f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.FilterClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for FilterClassifier Operator.
@@ -39,6 +41,7 @@ public class FilterClassifierApp implements StreamingApplication
 {
   private final Locality locality = null;
   public static final int QUEUE_CAPACITY = 16 * 1024;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
@@ -80,9 +83,9 @@ public class FilterClassifierApp implements StreamingApplication
     filter.setTotalFilter(100);
 
     HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
-    DevNull<HashMap<String,Double>> dev = dag.addOperator("dev",  new DevNull());
-    dag.addStream("filter1",hmapOper.hmap_data,filter.data).setLocality(locality);
-    dag.addStream("filer2",filter.filter,dev.data).setLocality(locality);
+    DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
+    dag.addStream("filter1", hmapOper.hmap_data, filter.data).setLocality(locality);
+    dag.addStream("filer2", filter.filter, dev.data).setLocality(locality);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
index c3d996e..52c0bed 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.FilteredEventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for FilteredEventClassifier Operator.