You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 21:10:20 UTC

[2/6] apex-malhar git commit: Fixed checkstyle errors for demos.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
index 32c7ccb..5df9b0d 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
@@ -25,14 +25,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -47,129 +48,142 @@ import com.datatorrent.lib.util.KeyHashValPair;
  * @since 0.9.0
  */
 @SuppressWarnings({ "deprecation", "unused" })
-public class ReduceOperator<K1, V1, K2, V2> implements Operator {
-	private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class);
-
-	private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass;
-	private transient Reducer<K1, V1, K2, V2> reduceObj;
-	private transient Reporter reporter;
-	private OutputCollector<K2, V2> outputCollector;
-	private String configFile;
-
-	public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass() {
-		return reduceClass;
-	}
-
-	public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass) {
-		this.reduceClass = reduceClass;
-	}
-
-	public String getConfigFile() {
-		return configFile;
-	}
-
-	public void setConfigFile(String configFile) {
-		this.configFile = configFile;
-	}
-
-	private int numberOfMappersRunning = -1;
-	private int operatorId;
-
-	public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>() {
-		@Override
-		public void process(KeyHashValPair<Integer, Integer> tuple) {
-			logger.info("processing {}", tuple);
-			if (numberOfMappersRunning == -1)
-				numberOfMappersRunning = tuple.getValue();
-			else
-				numberOfMappersRunning += tuple.getValue();
-
-		}
-
-	};
-
-	public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
-	private Map<K1, List<V1>> cacheObject;
-	public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>() {
-
-		@Override
-		public void process(KeyHashValPair<K1, V1> tuple) {
-			// logger.info("processing tupple {}",tuple);
-			List<V1> list = cacheObject.get(tuple.getKey());
-			if (list == null) {
-				list = new ArrayList<V1>();
-				list.add(tuple.getValue());
-				cacheObject.put(tuple.getKey(), list);
-			} else {
-				list.add(tuple.getValue());
-			}
-		}
-
-	};
-
-	@Override
-	public void setup(OperatorContext context) {
-		reporter = new ReporterImpl(ReporterType.Reducer, new Counters());
-		if(context != null){
-			operatorId = context.getId();
-		}
-		cacheObject = new HashMap<K1, List<V1>>();
-		outputCollector = new OutputCollectorImpl<K2, V2>();
-		if (reduceClass != null) {
-			try {
-				reduceObj = reduceClass.newInstance();
-			} catch (Exception e) {
-				logger.info("can't instantiate object {}", e.getMessage());
-				throw new RuntimeException(e);
-			}
-			Configuration conf = new Configuration();
-			InputStream stream = null;
-			if (configFile != null && configFile.length() > 0) {
-				logger.info("system /{}", configFile);
-				stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
-				if (stream == null) {
-					logger.info("system {}", configFile);
-					stream = ClassLoader.getSystemResourceAsStream(configFile);
-				}
-			}
-			if (stream != null) {
-				logger.info("found our stream... so adding it");
-				conf.addResource(stream);
-			}
-			reduceObj.configure(new JobConf(conf));
-		}
-
-	}
-
-	@Override
-	public void teardown() {
-
-	}
-
-	@Override
-	public void beginWindow(long windowId) {
-
-	}
-
-	@Override
-	public void endWindow() {
-		if (numberOfMappersRunning == 0) {
-			for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) {
-				try {
-					reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter);
-				} catch (IOException e1) {
-					logger.info(e1.getMessage());
-					throw new RuntimeException(e1);
-				}
-			}
-			List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList();
-			for (KeyHashValPair<K2, V2> e : list) {
-				output.emit(e);				
-			}
-			list.clear();
-			cacheObject.clear();
-			numberOfMappersRunning = -1;
-		}
-	}
+public class ReduceOperator<K1, V1, K2, V2> implements Operator
+{
+  private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class);
+
+  private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass;
+  private transient Reducer<K1, V1, K2, V2> reduceObj;
+  private transient Reporter reporter;
+  private OutputCollector<K2, V2> outputCollector;
+  private String configFile;
+
+  public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass()
+  {
+    return reduceClass;
+  }
+
+  public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass)
+  {
+    this.reduceClass = reduceClass;
+  }
+
+  public String getConfigFile()
+  {
+    return configFile;
+  }
+
+  public void setConfigFile(String configFile)
+  {
+    this.configFile = configFile;
+  }
+
+  private int numberOfMappersRunning = -1;
+  private int operatorId;
+
+  public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>()
+  {
+    @Override
+    public void process(KeyHashValPair<Integer, Integer> tuple)
+    {
+      logger.info("processing {}", tuple);
+      if (numberOfMappersRunning == -1) {
+        numberOfMappersRunning = tuple.getValue();
+      } else {
+        numberOfMappersRunning += tuple.getValue();
+      }
+
+    }
+
+  };
+
+  public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
+  private Map<K1, List<V1>> cacheObject;
+  public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>()
+  {
+    @Override
+    public void process(KeyHashValPair<K1, V1> tuple)
+    {
+      // logger.info("processing tupple {}",tuple);
+      List<V1> list = cacheObject.get(tuple.getKey());
+      if (list == null) {
+        list = new ArrayList<V1>();
+        list.add(tuple.getValue());
+        cacheObject.put(tuple.getKey(), list);
+      } else {
+        list.add(tuple.getValue());
+      }
+    }
+
+  };
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    reporter = new ReporterImpl(ReporterType.Reducer, new Counters());
+    if (context != null) {
+      operatorId = context.getId();
+    }
+    cacheObject = new HashMap<K1, List<V1>>();
+    outputCollector = new OutputCollectorImpl<K2, V2>();
+    if (reduceClass != null) {
+      try {
+        reduceObj = reduceClass.newInstance();
+      } catch (Exception e) {
+        logger.info("can't instantiate object {}", e.getMessage());
+        throw new RuntimeException(e);
+      }
+      Configuration conf = new Configuration();
+      InputStream stream = null;
+      if (configFile != null && configFile.length() > 0) {
+        logger.info("system /{}", configFile);
+        stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
+        if (stream == null) {
+          logger.info("system {}", configFile);
+          stream = ClassLoader.getSystemResourceAsStream(configFile);
+        }
+      }
+      if (stream != null) {
+        logger.info("found our stream... so adding it");
+        conf.addResource(stream);
+      }
+      reduceObj.configure(new JobConf(conf));
+    }
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (numberOfMappersRunning == 0) {
+      for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) {
+        try {
+          reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter);
+        } catch (IOException e1) {
+          logger.info(e1.getMessage());
+          throw new RuntimeException(e1);
+        }
+      }
+      List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+      for (KeyHashValPair<K2, V2> e : list) {
+        output.emit(e);
+      }
+      list.clear();
+      cacheObject.clear();
+      numberOfMappersRunning = -1;
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
index 1eb3bdd..d2d38da 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
@@ -18,8 +18,8 @@
  */
 package com.datatorrent.demos.mroperator;
 
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
 
@@ -28,81 +28,92 @@ import org.apache.hadoop.mapred.Reporter;
  *
  * @since 0.9.0
  */
-public class ReporterImpl implements Reporter {
-
-	private Counters counters;
-	InputSplit inputSplit;
-
-	public enum ReporterType {
-		Mapper, Reducer
-	}
-
-	private ReporterType typ;
-
-	public ReporterImpl(final ReporterType kind, final Counters ctrs) {
-		this.typ = kind;
-		this.counters = ctrs;
-	}
-
-	@Override
-	public InputSplit getInputSplit() {
-		if (typ == ReporterType.Reducer) {
-			throw new UnsupportedOperationException("Reducer cannot call getInputSplit()");
-		} else {
-			return inputSplit;
-		}
-	}
-
-	public void setInputSplit(InputSplit inputSplit) {
-		this.inputSplit = inputSplit;
-	}
-
-	@Override
-	public void incrCounter(Enum<?> key, long amount) {
-		if (null != counters) {
-			counters.incrCounter(key, amount);
-		}
-	}
-
-	@Override
-	public void incrCounter(String group, String counter, long amount) {
-		if (null != counters) {
-			counters.incrCounter(group, counter, amount);
-		}
-	}
-
-	@Override
-	public void setStatus(String status) {
-		// do nothing.
-	}
-
-	@Override
-	public void progress() {
-		// do nothing.
-	}
-
-	@Override
-	public Counter getCounter(String group, String name) {
-		Counters.Counter counter = null;
-		if (counters != null) {
-			counter = counters.findCounter(group, name);
-		}
-
-		return counter;
-	}
-
-	@Override
-	public Counter getCounter(Enum<?> key) {
-		Counters.Counter counter = null;
-		if (counters != null) {
-			counter = counters.findCounter(key);
-		}
-
-		return counter;
-	}
-
-	public float getProgress() {
-		return 0;
-	}
+public class ReporterImpl implements Reporter
+{
+  private Counters counters;
+  InputSplit inputSplit;
+
+  public enum ReporterType
+  {
+    Mapper, Reducer
+  }
+
+  private ReporterType typ;
+
+  public ReporterImpl(final ReporterType kind, final Counters ctrs)
+  {
+    this.typ = kind;
+    this.counters = ctrs;
+  }
+
+  @Override
+  public InputSplit getInputSplit()
+  {
+    if (typ == ReporterType.Reducer) {
+      throw new UnsupportedOperationException("Reducer cannot call getInputSplit()");
+    } else {
+      return inputSplit;
+    }
+  }
+
+  public void setInputSplit(InputSplit inputSplit)
+  {
+    this.inputSplit = inputSplit;
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount)
+  {
+    if (null != counters) {
+      counters.incrCounter(key, amount);
+    }
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount)
+  {
+    if (null != counters) {
+      counters.incrCounter(group, counter, amount);
+    }
+  }
+
+  @Override
+  public void setStatus(String status)
+  {
+    // do nothing.
+  }
+
+  @Override
+  public void progress()
+  {
+    // do nothing.
+  }
+
+  @Override
+  public Counter getCounter(String group, String name)
+  {
+    Counters.Counter counter = null;
+    if (counters != null) {
+      counter = counters.findCounter(group, name);
+    }
+
+    return counter;
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> key)
+  {
+    Counters.Counter counter = null;
+    if (counters != null) {
+      counter = counters.findCounter(key);
+    }
+
+    return counter;
+  }
+
+  public float getProgress()
+  {
+    return 0;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
index d5cbdb0..f78cf99 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
@@ -19,13 +19,24 @@
 package com.datatorrent.demos.mroperator;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 
 /**
  * <p>WordCount class.</p>
@@ -38,7 +49,7 @@ public class WordCount
 
   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
   {
-    private final static IntWritable one = new IntWritable(1);
+    private static final IntWritable one = new IntWritable(1);
     private Text word = new Text();
 
     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
index e8c71c3..0f330e8 100644
--- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
+++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
@@ -23,6 +23,15 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -35,13 +44,6 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -118,8 +120,7 @@ public class MapOperatorTest
       testDir = baseDir + "/" + methodName;
       try {
         FileUtils.forceMkdir(new File(testDir));
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         throw new RuntimeException(ex);
       }
       createFile(testDir + "/" + file1, "1\n2\n3\n1\n2\n3\n");
@@ -131,16 +132,13 @@ public class MapOperatorTest
       try {
         output = new BufferedWriter(new FileWriter(new File(fileName)));
         output.write(data);
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         throw new RuntimeException(ex);
-      }
-      finally {
+      } finally {
         if (output != null) {
           try {
             output.close();
-          }
-          catch (IOException ex) {
+          } catch (IOException ex) {
             LOG.error("not able to close the output stream: ", ex);
           }
         }
@@ -152,8 +150,7 @@ public class MapOperatorTest
     {
       try {
         FileUtils.deleteDirectory(new File(baseDir));
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         throw new RuntimeException(ex);
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
index 9ad5637..b85f8ad 100644
--- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
+++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
@@ -18,55 +18,57 @@
  */
 package com.datatorrent.demos.mroperator;
 
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.KeyHashValPair;
 
-public class ReduceOperatorTest {
-
-	 private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class);
-
-	/**
-	 * Test node logic emits correct results
-	 */
-	@Test
-	public void testNodeProcessing() throws Exception {
-		testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>());
-	}
+public class ReduceOperatorTest
+{
+  private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class);
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper) {
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>());
+  }
 
-		oper.setReduceClass(WordCount.Reduce.class);
-		oper.setConfigFile(null);
-		oper.setup(null);
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper)
+  {
+    oper.setReduceClass(WordCount.Reduce.class);
+    oper.setConfigFile(null);
+    oper.setup(null);
 
-		CollectorTestSink sortSink = new CollectorTestSink();
+    CollectorTestSink sortSink = new CollectorTestSink();
     oper.output.setSink(sortSink);
 
-		oper.beginWindow(0);
-		oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1));
-		oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
-		oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
-		oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
-		oper.endWindow();
+    oper.beginWindow(0);
+    oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+    oper.endWindow();
 
-		oper.beginWindow(1);
-		oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
-		oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
-		oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
-		oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1));
-		oper.endWindow();
-		Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
-		for (Object o : sortSink.collectedTuples) {
+    oper.beginWindow(1);
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+    oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1));
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
+    for (Object o : sortSink.collectedTuples) {
       logger.debug(o.toString());
     }
     logger.debug("Done testing round\n");
-	}
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
index cb1521a..bd732c1 100644
--- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
+++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
@@ -23,14 +23,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
 import com.google.common.collect.Maps;
 
 import com.datatorrent.api.LocalMode;
@@ -58,7 +59,7 @@ public class WordCountMRApplicationTest
     List<String> readLines = FileUtils.readLines(new File(testMeta.testDir + "/output.txt"));
     Map<String,Integer> readMap = Maps.newHashMap();
     Iterator<String> itr = readLines.iterator();
-    while(itr.hasNext()){
+    while (itr.hasNext()) {
       String[] splits = itr.next().split("=");
       readMap.put(splits[0],Integer.valueOf(splits[1]));
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
index 8f4dd92..55ffe92 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
@@ -20,13 +20,12 @@ package com.datatorrent.demos.pi;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
 
 /**
  * Monte Carlo PI estimation demo : <br>
@@ -75,7 +74,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="PiDemo")
+@ApplicationAnnotation(name = "PiDemo")
 public class Application implements StreamingApplication
 {
   private final Locality locality = null;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
index 57c5249..328bb10 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
@@ -84,7 +84,7 @@ import com.datatorrent.lib.testbench.RandomEventGenerator;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="PiDemoAppData")
+@ApplicationAnnotation(name = "PiDemoAppData")
 public class ApplicationAppData implements StreamingApplication
 {
   public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json";

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
index 3ed376f..0796608 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
@@ -18,16 +18,15 @@
  */
 package com.datatorrent.demos.pi;
 
-
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.script.JavaScriptOperator;
 import com.datatorrent.lib.stream.RoundRobinHashMap;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 /**
  * Monte Carlo PI estimation demo : <br>
@@ -78,7 +77,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="PiJavaScriptDemo")
+@ApplicationAnnotation(name = "PiJavaScriptDemo")
 public class ApplicationWithScript implements StreamingApplication
 {
 
@@ -92,13 +91,13 @@ public class ApplicationWithScript implements StreamingApplication
     rand.setMaxvalue(maxValue);
 
     RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>());
-    rrhm.setKeys(new String[] { "x", "y" });
+    rrhm.setKeys(new String[]{"x", "y"});
 
     JavaScriptOperator calc = dag.addOperator("picalc", new JavaScriptOperator());
     calc.setPassThru(false);
     calc.put("i",0);
     calc.put("count",0);
-    calc.addSetupScript("function pi() { if (x*x+y*y <= "+maxValue*maxValue+") { i++; } count++; return i / count * 4; }");
+    calc.addSetupScript("function pi() { if (x*x+y*y <= " + maxValue * maxValue + ") { i++; } count++; return i / count * 4; }");
 
     calc.setInvoke("pi");
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
index 9363b88..221ecc0 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
@@ -40,7 +40,7 @@ import com.datatorrent.lib.testbench.RandomEventGenerator;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="PiLibraryDemo")
+@ApplicationAnnotation(name = "PiLibraryDemo")
 public class Calculator implements StreamingApplication
 {
   @Override
@@ -56,7 +56,7 @@ public class Calculator implements StreamingApplication
     AbstractAggregator<Integer> pairOperator = dag.addOperator("PairXY", new ArrayListAggregator<Integer>());
     Sigma<Integer> sumOperator = dag.addOperator("SumXY", new Sigma<Integer>());
     LogicalCompareToConstant<Integer> comparator = dag.addOperator("AnalyzeLocation", new LogicalCompareToConstant<Integer>());
-    comparator.setConstant(30000 *30000);
+    comparator.setConstant(30000 * 30000);
     Counter inCircle = dag.addOperator("CountInCircle", Counter.class);
     Counter inSquare = dag.addOperator("CountInSquare", Counter.class);
     Division division = dag.addOperator("Ratio", Division.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
index ce5ef9d..c50e17e 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
@@ -25,10 +25,10 @@ import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * <p>An operator which converts a raw value to a named value singleton list.</p>
@@ -47,9 +47,11 @@ public class NamedValueList<T> extends BaseOperator
   private List<Map<String, T>> valueList;
   private Map<String, T> valueMap;
 
-  public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>() {
+  public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>()
+  {
     @Override
-    public void process(T val) {
+    public void process(T val)
+    {
       valueMap.put(valueName, val);
       outPort.emit(valueList);
     }
@@ -80,11 +82,13 @@ public class NamedValueList<T> extends BaseOperator
   {
   }
 
-  public String getValueName() {
+  public String getValueName()
+  {
     return valueName;
   }
 
-  public void setValueName(String name) {
+  public void setValueName(String name)
+  {
     valueName = name;
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
index 14edf19..8e61991 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
@@ -21,10 +21,10 @@ package com.datatorrent.demos.pi;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator implements Monte Carlo estimation of pi. For points randomly distributed points on
@@ -46,8 +46,7 @@ public class PiCalculateOperator extends BaseOperator
     {
       if (x == -1) {
         x = tuple;
-      }
-      else {
+      } else {
         y = tuple;
         if (x * x + y * y <= base) {
           inArea++;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
index b61077a..d8881c2 100644
--- a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
+++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
@@ -18,13 +18,11 @@
  */
 package com.datatorrent.demos.pi;
 
-
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
 
-
 /**
  *
  */
@@ -33,12 +31,12 @@ public class ApplicationTest
   @Test
   public void testSomeMethod() throws Exception
   {
-	  LocalMode lma = LocalMode.newInstance();
-	    Configuration conf =new Configuration(false);
-	    conf.addResource("dt-site-pi.xml");
-	    lma.prepareDAG(new Application(), conf);
-	    LocalMode.Controller lc = lma.getController();
-	    lc.run(10000);
-  
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource("dt-site-pi.xml");
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
index cd52873..8e12fcc 100644
--- a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
+++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
@@ -18,9 +18,8 @@
  */
 package com.datatorrent.demos.pi;
 
-
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
 
@@ -33,7 +32,7 @@ public class CalculatorTest
   public void testSomeMethod() throws Exception
   { 
     LocalMode lma = LocalMode.newInstance();
-    Configuration conf =new Configuration(false);
+    Configuration conf = new Configuration(false);
     conf.addResource("dt-site-pilibrary.xml");
     lma.prepareDAG(new Calculator(), conf);
     LocalMode.Controller lc = lma.getController();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
index 8558554..cf49848 100755
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
+++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
@@ -25,11 +25,10 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.contrib.r.RScript;
-
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.contrib.r.RScript;
 
 /**
  * @since 2.1.0
@@ -52,7 +51,8 @@ public class FaithfulRScript extends RScript
   }
 
   @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>() {
+  public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>()
+  {
     @Override
     public void process(FaithfulKey tuple)
     {
@@ -65,7 +65,8 @@ public class FaithfulRScript extends RScript
   };
 
   @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>() {
+  public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>()
+  {
     @Override
     public void process(Integer eT)
     {
@@ -82,9 +83,9 @@ public class FaithfulRScript extends RScript
   @Override
   public void endWindow()
   {
-
-    if (readingsList.size() == 0)
+    if (readingsList.size() == 0) {
       return;
+    }
     LOG.info("Input data size: readingsList - " + readingsList.size());
 
     double[] eruptionDuration = new double[readingsList.size()];
@@ -106,6 +107,5 @@ public class FaithfulRScript extends RScript
     super.process(map);
     readingsList.clear();
     map.clear();
-
-  };
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
index 86abba7..c45cd50 100755
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
+++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
@@ -82,8 +82,9 @@ public class InputGenerator implements InputOperator
   {
     int id;
     do {
-      id = (int) Math.abs(Math.round(random.nextGaussian() * max));
-    } while (id >= max);
+      id = (int)Math.abs(Math.round(random.nextGaussian() * max));
+    }
+    while (id >= max);
 
     if (id < min) {
       id = min;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
index 400e80c..0483767 100755
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
+++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
@@ -23,11 +23,10 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
 
 /**
  * The application attempts to simulate 'Old Faithful Geyser" eruption.
@@ -38,7 +37,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  * waiting times and eruption duration values.
  * For every application window, it generates only one 'elapsed time' input for which the
  * prediction would be made.
- * Model in R is in file ruptionModel.R located at 
+ * Model in R is in file ruptionModel.R located at
  * demos/r/src/main/resources/com/datatorrent/demos/oldfaithful/ directory
  *
  * @since 2.1.0

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
index dc6a8cb..0bb1901 100755
--- a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
+++ b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.LocalMode;
 
 public class OldFaithfulApplicationTest
 {
-  
+
   private static final Logger LOG = LoggerFactory.getLogger(OldFaithfulApplicationTest.class);
 
   @Test

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
index fd2a430..b9d32ab 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
@@ -18,6 +18,9 @@
  */
 package com.datatorrent.demos.twitter;
 
+import java.net.URI;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.Operator.InputPort;
@@ -31,10 +34,7 @@ import com.datatorrent.contrib.twitter.TwitterSampleInput;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 
-import java.net.URI;
 /**
  * Twitter Demo Application: <br>
  * This demo application samples random public status from twitter, send to Hashtag
@@ -167,7 +167,7 @@ import java.net.URI;
  *
  * @since 2.0.0
  */
-@ApplicationAnnotation(name="TwitterKinesisDemo")
+@ApplicationAnnotation(name = "TwitterKinesisDemo")
 public class KinesisHashtagsApplication implements StreamingApplication
 {
   private final Locality locality = null;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
index 9bd81a4..8b9f447 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
@@ -32,7 +32,7 @@ public class SlidingContainer<T> implements Serializable
   T identifier;
   int totalCount;
   int position;
-  int windowedCount[];
+  int[] windowedCount;
 
   @SuppressWarnings("unused")
   private SlidingContainer()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
index f61f5be..9edce64 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
@@ -24,8 +24,6 @@ import java.sql.SQLException;
 import javax.annotation.Nonnull;
 
 import org.apache.hadoop.conf.Configuration;
-import twitter4j.Status;
-
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
@@ -34,6 +32,8 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.twitter.TwitterSampleInput;
 import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
 
+import twitter4j.Status;
+
 /**
  * An application which connects to Twitter Sample Input and stores all the
  * tweets with their usernames in a mysql database. Please review the docs
@@ -63,7 +63,7 @@ import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
  *
  * @since 0.9.4
  */
-@ApplicationAnnotation(name="TwitterDumpDemo")
+@ApplicationAnnotation(name = "TwitterDumpDemo")
 public class TwitterDumpApplication implements StreamingApplication
 {
   public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
index ecc412f..3adbbe0 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
@@ -23,14 +23,14 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
 
-import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
+import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+
 import twitter4j.Status;
 
 /**
@@ -47,7 +47,7 @@ import twitter4j.Status;
  *
  * @since 1.0.2
  */
-@ApplicationAnnotation(name="TwitterDumpHBaseDemo")
+@ApplicationAnnotation(name = "TwitterDumpHBaseDemo")
 public class TwitterDumpHBaseApplication implements StreamingApplication
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
index 5ed6774..d22db40 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
@@ -18,12 +18,12 @@
  */
 package com.datatorrent.demos.twitter;
 
-import twitter4j.HashtagEntity;
-import twitter4j.Status;
-
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import twitter4j.HashtagEntity;
+import twitter4j.Status;
 
 /**
  * <p>TwitterStatusHashtagExtractor class.</p>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
index ed4e207..6dbc436 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
@@ -18,12 +18,13 @@
  */
 package com.datatorrent.demos.twitter;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 import twitter4j.Status;
 import twitter4j.URLEntity;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
index 1818dca..e05a37a 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.demos.twitter;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.Arrays;
 import java.util.HashSet;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * <p>TwitterStatusWordExtractor class.</p>
  *
@@ -41,7 +41,7 @@ public class TwitterStatusWordExtractor extends BaseOperator
     @Override
     public void process(String text)
     {
-      String strs[] = text.split(" ");
+      String[] strs = text.split(" ");
       if (strs != null) {
         for (String str : strs) {
           if (str != null && !filterList.contains(str) ) {
@@ -56,7 +56,7 @@ public class TwitterStatusWordExtractor extends BaseOperator
   public void setup(OperatorContext context)
   {
     this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but",
-           "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when",
-    "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"}));
+      "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when",
+      "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"}));
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
index c8d3b00..731a38f 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
@@ -19,10 +19,14 @@
 package com.datatorrent.demos.twitter;
 
 import java.net.URI;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.collect.Maps;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -35,15 +39,10 @@ import com.datatorrent.contrib.twitter.TwitterSampleInput;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.appdata.schemas.SchemaUtils;
 import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
 import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
 
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
 /**
  * Twitter Demo Application: <br>
  * This demo application samples random public status from twitter, send to url
@@ -147,7 +146,7 @@ import java.util.Map;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name=TwitterTopCounterApplication.APP_NAME)
+@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME)
 public class TwitterTopCounterApplication implements StreamingApplication
 {
   public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json";
@@ -188,11 +187,7 @@ public class TwitterTopCounterApplication implements StreamingApplication
     consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url");
   }
 
-  public static void consoleOutput(DAG dag,
-                                   String operatorName,
-                                   OutputPort<List<Map<String, Object>>> topCount,
-                                   String schemaFile,
-                                   String alias)
+  public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
   {
     String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
     if (!StringUtils.isEmpty(gatewayAddress)) {
@@ -217,8 +212,7 @@ public class TwitterTopCounterApplication implements StreamingApplication
 
       dag.addStream("MapProvider", topCount, snapshotServer.input);
       dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
-    }
-    else {
+    } else {
       ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
       operator.setStringFormat(operatorName + ": %s");
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
index 8ed3678..3953ab7 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.demos.twitter;
 
+import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -25,9 +26,6 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.twitter.TwitterSampleInput;
 import com.datatorrent.lib.algo.UniqueCounter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
 
 /**
  * This application is same as other twitter demo
@@ -43,7 +41,7 @@ import org.apache.hadoop.conf.Configuration;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name=TwitterTopWordsApplication.APP_NAME)
+@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME)
 public class TwitterTopWordsApplication implements StreamingApplication
 {
   public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json";

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
index 5246060..3597a92 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
@@ -18,16 +18,13 @@
  */
 package com.datatorrent.demos.twitter;
 
+import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.twitter.TwitterSampleInput;
 import com.datatorrent.lib.algo.UniqueCounter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-
 
 /**
  * Twitter Demo Application: <br>
@@ -134,7 +131,7 @@ import org.apache.hadoop.conf.Configuration;
  *
  * @since 1.0.2
  */
-@ApplicationAnnotation(name=TwitterTrendingHashtagsApplication.APP_NAME)
+@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME)
 public class TwitterTrendingHashtagsApplication implements StreamingApplication
 {
   public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json";

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
index 7f6f399..43ed8f7 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
@@ -18,11 +18,11 @@
  */
 package com.datatorrent.demos.twitter;
 
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
 import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
 
 /**
  * <p>URLSerDe class.</p>
@@ -42,11 +42,9 @@ public class URLSerDe implements StreamCodec<byte[]>
   {
     if (fragment == null || fragment.buffer == null) {
       return null;
-    }
-    else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) {
+    } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) {
       return fragment.buffer;
-    }
-    else {
+    } else {
       byte[] buffer = new byte[fragment.buffer.length];
       System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length);
       return buffer;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
index 449c903..20bb673 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
@@ -18,23 +18,25 @@
  */
 package com.datatorrent.demos.twitter;
 
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.*;
-import com.datatorrent.api.Context.OperatorContext;
-
-import com.datatorrent.common.util.BaseOperator;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  *
  * WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size.
@@ -114,8 +116,7 @@ public class WindowedTopCounter<T> extends BaseOperator
 
       if (holder.totalCount == 0) {
         iterator.remove();
-      }
-      else {
+      } else {
         topCounter.add(holder);
         if (--i == 0) {
           break;
@@ -138,8 +139,7 @@ public class WindowedTopCounter<T> extends BaseOperator
           topCounter.poll();
           topCounter.add(holder);
           smallest = topCounter.peek().totalCount;
-        }
-        else if (holder.totalCount == 0) {
+        } else if (holder.totalCount == 0) {
           iterator.remove();
         }
       }
@@ -149,7 +149,7 @@ public class WindowedTopCounter<T> extends BaseOperator
 
     Iterator<SlidingContainer<T>> topIter = topCounter.iterator();
 
-    while(topIter.hasNext()) {
+    while (topIter.hasNext()) {
       final SlidingContainer<T> wh = topIter.next();
       Map<String, Object> tableRow = Maps.newHashMap();
 
@@ -254,8 +254,7 @@ public class WindowedTopCounter<T> extends BaseOperator
     {
       if (o1.totalCount > o2.totalCount) {
         return 1;
-      }
-      else if (o1.totalCount < o2.totalCount) {
+      } else if (o1.totalCount < o2.totalCount) {
         return -1;
       }
 
@@ -274,8 +273,8 @@ public class WindowedTopCounter<T> extends BaseOperator
     @Override
     public int compare(Map<String, Object> o1, Map<String, Object> o2)
     {
-      Integer count1 = (Integer) o1.get(FIELD_COUNT);
-      Integer count2 = (Integer) o2.get(FIELD_COUNT);
+      Integer count1 = (Integer)o1.get(FIELD_COUNT);
+      Integer count2 = (Integer)o2.get(FIELD_COUNT);
 
       return count1.compareTo(count2);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
index a4daf09..cd211ff 100644
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
+++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
@@ -19,10 +19,11 @@
 package com.datatorrent.demos.twitter;
 
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 import org.apache.hadoop.conf.Configuration;
 
+import static org.junit.Assert.assertEquals;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
index 0ad4d18..91a4e20 100644
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
+++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
@@ -18,11 +18,10 @@
  */
 package com.datatorrent.demos.twitter;
 
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.demos.twitter.TwitterTopCounterApplication;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
 
 /**
  * Test the DAG declaration in local mode.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
index a27c60f..4ac2e8d 100644
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
+++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
@@ -18,12 +18,11 @@
  */
 package com.datatorrent.demos.twitter;
 
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.contrib.twitter.TwitterSampleInput;
 
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
 /**
  * Test the DAG declaration in local mode.
  */
@@ -38,9 +37,9 @@ public class TwitterTopWordsTest
   @Test
   public void testApplication() throws Exception
   {
-	TwitterTopWordsApplication app = new TwitterTopWordsApplication();
-	Configuration conf =new Configuration(false);
-	conf.addResource("dt-site-rollingtopwords.xml");
+    TwitterTopWordsApplication app = new TwitterTopWordsApplication();
+    Configuration conf = new Configuration(false);
+    conf.addResource("dt-site-rollingtopwords.xml");
     LocalMode lma = LocalMode.newInstance();
     lma.prepareDAG(app, conf);
     LocalMode.Controller lc = lma.getController();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
index 1b27cea..57ef1a1 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.demos.uniquecount;
 
-
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context;
@@ -27,6 +26,7 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
+import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -34,8 +34,6 @@ import com.datatorrent.lib.stream.Counter;
 import com.datatorrent.lib.stream.StreamDuplicater;
 import com.datatorrent.lib.util.KeyHashValPair;
 
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-
 /**
  * Application to demonstrate PartitionableUniqueCount operator. <br>
  * The input operator generate random keys, which is sent to
@@ -45,7 +43,7 @@ import com.datatorrent.common.partitioner.StatelessPartitioner;
  *
  * @since 1.0.2
  */
-@ApplicationAnnotation(name="UniqueValueCountDemo")
+@ApplicationAnnotation(name = "UniqueValueCountDemo")
 public class Application implements StreamingApplication
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
index 3a5140d..d201037 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
@@ -18,6 +18,9 @@
  */
 package com.datatorrent.demos.uniquecount;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
@@ -25,9 +28,6 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.util.KeyHashValPair;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /*
 Compare results and print non-matching values to console.
  */
@@ -41,35 +41,33 @@ public class CountVerifier<K> implements Operator
   HashMap<K, Integer> map1 = new HashMap<K, Integer>();
   HashMap<K, Integer> map2 = new HashMap<K, Integer>();
 
-  public transient final DefaultInputPort<KeyHashValPair<K, Integer>> in1 =
-      new DefaultInputPort<KeyHashValPair<K, Integer>>()
-      {
-        @Override
-        public void process(KeyHashValPair<K, Integer> tuple)
-        {
-          processTuple(tuple, map1);
-        }
-      };
+  public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in1 = new DefaultInputPort<KeyHashValPair<K, Integer>>()
+  {
+    @Override
+    public void process(KeyHashValPair<K, Integer> tuple)
+    {
+      processTuple(tuple, map1);
+    }
+  };
 
-  public transient final DefaultInputPort<KeyHashValPair<K, Integer>> in2 =
-      new DefaultInputPort<KeyHashValPair<K, Integer>>()
-      {
-        @Override
-        public void process(KeyHashValPair<K, Integer> tuple)
-        {
-          processTuple(tuple, map2);
-        }
-      };
+  public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in2 = new DefaultInputPort<KeyHashValPair<K, Integer>>()
+  {
+    @Override
+    public void process(KeyHashValPair<K, Integer> tuple)
+    {
+      processTuple(tuple, map2);
+    }
+  };
 
   void processTuple(KeyHashValPair<K, Integer> tuple, HashMap<K, Integer> map)
   {
     map.put(tuple.getKey(), tuple.getValue());
   }
 
-  @OutputPortFieldAnnotation(optional=true)
-  public transient final DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>();
-  @OutputPortFieldAnnotation(optional=true)
-  public transient final DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>();
 
   @Override
   public void beginWindow(long l)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
index 2742961..e806759 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
@@ -18,14 +18,17 @@
  */
 package com.datatorrent.demos.uniquecount;
 
+import java.util.HashMap;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.lib.util.KeyValPair;
 
-import java.util.HashMap;
-import java.util.Random;
-
 /**
  * Generate random Key value pairs.
  * key is string and value is int, it emits the pair as KeyValPair on outPort,
@@ -36,6 +39,7 @@ public class RandomDataGenerator implements InputOperator
 {
   public final transient DefaultOutputPort<KeyValPair<String, Object>> outPort = new DefaultOutputPort<KeyValPair<String, Object>>();
   private HashMap<String, Integer> dataInfo;
+  private final transient Logger LOG = LoggerFactory.getLogger(RandomDataGenerator.class);
   private int count;
   private int sleepMs = 10;
   private int keyRange = 100;
@@ -51,15 +55,15 @@ public class RandomDataGenerator implements InputOperator
   @Override
   public void emitTuples()
   {
-    for(int i = 0 ; i < tupleBlast; i++) {
+    for (int i = 0; i < tupleBlast; i++) {
       String key = String.valueOf(random.nextInt(keyRange));
       int val = random.nextInt(valueRange);
       outPort.emit(new KeyValPair<String, Object>(key, val));
     }
     try {
       Thread.sleep(sleepMs);
-    } catch(Exception ex) {
-      System.out.println(ex.getMessage());
+    } catch (Exception ex) {
+      LOG.error(ex.getMessage());
     }
     count++;
   }
@@ -93,7 +97,7 @@ public class RandomDataGenerator implements InputOperator
   @Override
   public void endWindow()
   {
-    System.out.println("emitTuples called  " + count + " times in this window");
+    LOG.debug("emitTuples called  " + count + " times in this window");
     count = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
index feeb282..28f3bc0 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
@@ -18,16 +18,16 @@
  */
 package com.datatorrent.demos.uniquecount;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * Input port operator for generating random values on keys. <br>
  * Key(s)   : key + integer in range between 0 and numKeys <br>
@@ -37,107 +37,117 @@ import java.util.Random;
  */
 public class RandomKeyValues implements InputOperator
 {
-	public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>();
-	private Random random = new Random(11111);
-    private int numKeys;
-    private int numValuesPerKeys;
-    private int tuppleBlast = 1000;
-    private int emitDelay = 20; /* 20 ms */
-
-    /* For verification */
-    private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>();
-
-    public RandomKeyValues() {
-        this.numKeys = 100;
-        this.numValuesPerKeys = 100;
-    }
-
-    public RandomKeyValues(int keys, int values) {
-        this.numKeys = keys;
-        this.numValuesPerKeys = values;
-    }
-
-    @Override
-	public void beginWindow(long windowId)
-	{
-	}
-
-	@Override
-	public void endWindow()
-	{
-	}
-
-	@Override
-	public void setup(OperatorContext context)
-	{
-	}
-
-	@Override
-	public void teardown()
-	{
-	}
-
-	@Override
-	public void emitTuples()
-	{
-        /* generate tuples randomly, */
-        for(int i = 0; i < tuppleBlast; i++) {
-            int intKey = random.nextInt(numKeys);
-            String key = "key" + String.valueOf(intKey);
-            int value = random.nextInt(numValuesPerKeys);
-
-            // update history for verifying later.
-            BitSet bmap = history.get(intKey);
-            if (bmap == null) {
-                bmap = new BitSet();
-                history.put(intKey, bmap);
-            }
-            bmap.set(value);
-
-            // emit the key with value.
-            outport.emit(new KeyValPair<String, Object>(key, value));
-        }
-		try
-		{
-			Thread.sleep(emitDelay);
-		} catch (Exception e)
-		{
-		}
-	}
-
-    public int getNumKeys() {
-        return numKeys;
-    }
-
-    public void setNumKeys(int numKeys) {
-        this.numKeys = numKeys;
-    }
-
-    public int getNumValuesPerKeys() {
-        return numValuesPerKeys;
-    }
-
-    public void setNumValuesPerKeys(int numValuesPerKeys) {
-        this.numValuesPerKeys = numValuesPerKeys;
-    }
-
-    public int getTuppleBlast() {
-        return tuppleBlast;
+  public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>();
+  private Random random = new Random(11111);
+  private int numKeys;
+  private int numValuesPerKeys;
+  private int tuppleBlast = 1000;
+  private int emitDelay = 20; /* 20 ms */
+
+  /* For verification */
+  private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>();
+
+  public RandomKeyValues()
+  {
+    this.numKeys = 100;
+    this.numValuesPerKeys = 100;
+  }
+
+  public RandomKeyValues(int keys, int values)
+  {
+    this.numKeys = keys;
+    this.numValuesPerKeys = values;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    /* generate tuples randomly, */
+    for (int i = 0; i < tuppleBlast; i++) {
+      int intKey = random.nextInt(numKeys);
+      String key = "key" + String.valueOf(intKey);
+      int value = random.nextInt(numValuesPerKeys);
+
+      // update history for verifying later.
+      BitSet bmap = history.get(intKey);
+      if (bmap == null) {
+        bmap = new BitSet();
+        history.put(intKey, bmap);
+      }
+      bmap.set(value);
+
+      // emit the key with value.
+      outport.emit(new KeyValPair<String, Object>(key, value));
     }
-
-    public void setTuppleBlast(int tuppleBlast) {
-        this.tuppleBlast = tuppleBlast;
-    }
-
-    public int getEmitDelay() {
-        return emitDelay;
-    }
-
-    public void setEmitDelay(int emitDelay) {
-        this.emitDelay = emitDelay;
-    }
-
-    public void debug() {
-
+    try {
+      Thread.sleep(emitDelay);
+    } catch (Exception e) {
+      // Ignore.
     }
+  }
+
+  public int getNumKeys()
+  {
+    return numKeys;
+  }
+
+  public void setNumKeys(int numKeys)
+  {
+    this.numKeys = numKeys;
+  }
+
+  public int getNumValuesPerKeys()
+  {
+    return numValuesPerKeys;
+  }
+
+  public void setNumValuesPerKeys(int numValuesPerKeys)
+  {
+    this.numValuesPerKeys = numValuesPerKeys;
+  }
+
+  public int getTuppleBlast()
+  {
+    return tuppleBlast;
+  }
+
+  public void setTuppleBlast(int tuppleBlast)
+  {
+    this.tuppleBlast = tuppleBlast;
+  }
+
+  public int getEmitDelay()
+  {
+    return emitDelay;
+  }
+
+  public void setEmitDelay(int emitDelay)
+  {
+    this.emitDelay = emitDelay;
+  }
+
+  public void debug()
+  {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
index 65b5c95..eb9d22c 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
@@ -18,14 +18,18 @@
  */
 package com.datatorrent.demos.uniquecount;
 
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.util.KeyHashValPair;
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import java.util.*;
 
 /*
     Generate random keys.
@@ -61,8 +65,7 @@ public class RandomKeysGenerator implements InputOperator
       outPort.emit(key);
 
 
-      if (verificationPort.isConnected())
-      {
+      if (verificationPort.isConnected()) {
         // maintain history for later verification.
         MutableInt count = history.get(key);
         if (count == null) {
@@ -74,10 +77,11 @@ public class RandomKeysGenerator implements InputOperator
 
     }
     try {
-      if (sleepTime != 0)
+      if (sleepTime != 0) {
         Thread.sleep(sleepTime);
+      }
     } catch (Exception ex) {
-
+      // Ignore.
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
index 95323d5..eb9e392 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.demos.uniquecount;
 
-
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context;
@@ -27,19 +26,19 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.util.KeyValPair;
 
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-
 /**
  * <p>UniqueKeyValCountDemo class.</p>
  *
  * @since 1.0.2
  */
-@ApplicationAnnotation(name="UniqueKeyValueCountDemo")
+@ApplicationAnnotation(name = "UniqueKeyValueCountDemo")
 public class UniqueKeyValCountDemo implements StreamingApplication
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
index 991a94d..66a0af1 100644
--- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
+++ b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
@@ -18,9 +18,9 @@
  */
 package com.datatorrent.demos.uniquecount;
 
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
 
 /**
  * Test the DAG declaration in local mode.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
index 01e790a..a198247 100644
--- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
+++ b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
@@ -18,9 +18,9 @@
  */
 package com.datatorrent.demos.uniquecount;
 
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
 
 /**
  * Test the DAG declaration in local mode.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
index 1028080..d0512cf 100644
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.demos.wordcount;
 
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.api.StreamingApplication;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 
-import org.apache.hadoop.conf.Configuration;
-
 /**
  * Simple Word Count Demo : <br>
  * This is application to count total occurrence of each word from file or any
@@ -72,8 +72,8 @@ import org.apache.hadoop.conf.Configuration;
  * Streaming Window Size : 500ms
  * Operator Details : <br>
  * <ul>
- * 	<li>
- *     <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application.
+ * <li>
+ * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application.
  *         This can replaced by any input stream by user. <br>
  *     Class : {@link com.datatorrent.demos.wordcount.WordCountInputOperator}  <br>
  *     Operator Application Window Count : 1 <br>
@@ -93,10 +93,10 @@ import org.apache.hadoop.conf.Configuration;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="WordCountDemo")
+@ApplicationAnnotation(name = "WordCountDemo")
 public class Application implements StreamingApplication
 {
-   @Override
+  @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     WordCountInputOperator input = dag.addOperator("wordinput", new WordCountInputOperator());
@@ -104,8 +104,5 @@ public class Application implements StreamingApplication
     dag.addStream("wordinput-count", input.outputPort, wordCount.data);
     ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator());
     dag.addStream("count-console",wordCount.count, consoleOperator.input);
-
   }
-
-
 }