You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 21:10:21 UTC

[3/6] apex-malhar git commit: Fixed checkstyle errors for demos.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
index d140f77..8f68dab 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
@@ -20,8 +20,6 @@ package com.datatorrent.demos.machinedata.operator;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -33,14 +31,16 @@ import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.datatorrent.api.*;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamCodec;
 import com.datatorrent.common.util.BaseOperator;
-
-import com.datatorrent.demos.machinedata.data.*;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
+import com.datatorrent.demos.machinedata.data.ResourceType;
 import com.datatorrent.demos.machinedata.util.DataTable;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * <p>
@@ -179,13 +179,12 @@ public class CalculatorOperator extends BaseOperator
   {
 
     double val = (kthPercentile * sorted.size()) / 100.0;
-    if (val == (int) val) {
+    if (val == (int)val) {
       // Whole number
-      int idx = (int) val - 1;
+      int idx = (int)val - 1;
       return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0;
-    }
-    else {
-      int idx = (int) Math.round(val) - 1;
+    } else {
+      int idx = (int)Math.round(val) - 1;
       return sorted.get(idx);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
index 29f700f..bbfd547 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
@@ -18,18 +18,6 @@
  */
 package com.datatorrent.demos.machinedata.operator;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.lib.util.KeyHashValPair;
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.google.common.collect.Maps;
-
 import java.math.BigDecimal;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -38,6 +26,18 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.demos.machinedata.data.AverageData;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
+import com.datatorrent.lib.util.KeyHashValPair;
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * This class calculates the average for various resources across different devices for a given key
  * <p>MachineInfoAveragingOperator class.</p>
@@ -184,7 +184,7 @@ public class MachineInfoAveragingOperator extends BaseOperator
   {
     StringBuilder sb = new StringBuilder();
     if (key instanceof MachineKey) {
-      MachineKey mkey = (MachineKey) key;
+      MachineKey mkey = (MachineKey)key;
       Integer customer = mkey.getCustomer();
       if (customer != null) {
         sb.append("customer: " + customer + "\n");

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
index 15e6f07..cb5fa5a 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
@@ -18,19 +18,18 @@
  */
 package com.datatorrent.demos.machinedata.operator;
 
-import com.datatorrent.common.util.BaseOperator;
+import java.util.HashMap;
+import java.util.Map;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.demos.machinedata.data.AverageData;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
 import com.datatorrent.lib.util.KeyHashValPair;
 
-
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * This class calculates the partial sum and count for tuples generated by upstream Operator
  * <p> MachineInfoAveragingPrerequisitesOperator class. </p>
@@ -51,8 +50,6 @@ public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
       MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier();
       return unifier;
     }
-
-    ;
   };
 
   public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
@@ -66,8 +63,7 @@ public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
       if (averageData == null) {
         averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1);
         sums.put(key, averageData);
-      }
-      else {
+      } else {
         averageData.setCpu(averageData.getCpu() + tuple.getCpu());
         averageData.setRam(averageData.getRam() + tuple.getRam());
         averageData.setHdd(averageData.getHdd() + tuple.getHdd());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
index 40995b2..e0b67f3 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
@@ -21,8 +21,8 @@ package com.datatorrent.demos.machinedata.operator;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator.Unifier;
 
 import com.datatorrent.demos.machinedata.data.AverageData;
@@ -80,8 +80,7 @@ public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<Machi
     AverageData tupleValue = arg0.getValue();
     if (averageData == null) {
       sums.put(tupleKey, tupleValue);
-    }
-    else {
+    } else {
       averageData.setCpu(averageData.getCpu() + tupleValue.getCpu());
       averageData.setRam(averageData.getRam() + tupleValue.getRam());
       averageData.setHdd(averageData.getHdd() + tupleValue.getHdd());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
index 88ee35d..6c4256a 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
@@ -18,7 +18,11 @@
  */
 package com.datatorrent.demos.machinedata.util;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Generate combinations of elements for the given array of elements.
@@ -27,66 +31,71 @@ import java.util.*;
  *
  * @since 0.3.5
  */
-public class Combinatorics<T> {
+public class Combinatorics<T>
+{
 
-    private T[] values;
-    private int size = -1;
-    private List<T> result;
-    private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
-    private int resultMapSize = 0;
+  private T[] values;
+  private int size = -1;
+  private List<T> result;
+  private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
+  private int resultMapSize = 0;
 
-    /**
-     * Generates all possible combinations with all the sizes.
-     *
-     * @param values
-     */
-    public Combinatorics(T[] values) {
-        this.values = values;
-        this.size = -1;
-        this.result = new ArrayList<>();
-    }
+  /**
+   * Generates all possible combinations with all the sizes.
+   *
+   * @param values
+   */
+  public Combinatorics(T[] values)
+  {
+    this.values = values;
+    this.size = -1;
+    this.result = new ArrayList<>();
+  }
 
-    /**
-     * Generates all possible combinations with the given size.
-     *
-     * @param values
-     * @param size
-     */
-    public Combinatorics(T[] values, int size) {
-        this.values = values;
-        this.size = size;
-        this.result = new ArrayList<>();
-    }
+  /**
+   * Generates all possible combinations with the given size.
+   *
+   * @param values
+   * @param size
+   */
+  public Combinatorics(T[] values, int size)
+  {
+    this.values = values;
+    this.size = size;
+    this.result = new ArrayList<>();
+  }
 
-    public Map<Integer, List<T>> generate() {
+  public Map<Integer, List<T>> generate()
+  {
 
-        if (size == -1) {
-            size = values.length;
-            for (int i = 1; i <= size; i++) {
-                int[] tmp = new int[i];
-                Arrays.fill(tmp, -1);
-                generateCombinations(0, 0, tmp);
-            }
-        } else {
-            int[] tmp = new int[size];
-            Arrays.fill(tmp, -1);
-            generateCombinations(0, 0, tmp);
-        }
-        return resultMap;
+    if (size == -1) {
+      size = values.length;
+      for (int i = 1; i <= size; i++) {
+        int[] tmp = new int[i];
+        Arrays.fill(tmp, -1);
+        generateCombinations(0, 0, tmp);
+      }
+    } else {
+      int[] tmp = new int[size];
+      Arrays.fill(tmp, -1);
+      generateCombinations(0, 0, tmp);
     }
+    return resultMap;
+  }
 
-    public void generateCombinations(int start, int depth, int[] tmp) {
-        if (depth == tmp.length) {
-            for (int j = 0; j < depth; j++) {
-                result.add(values[tmp[j]]);
-            }
-            resultMap.put(++resultMapSize, result);
-            result = new ArrayList<>();
-            return;
-        }
-        for (int i = start; i < values.length; i++) {
-            tmp[depth] = i;
-            generateCombinations(i + 1, depth + 1, tmp);
-        }
+  public void generateCombinations(int start, int depth, int[] tmp)
+  {
+    if (depth == tmp.length) {
+      for (int j = 0; j < depth; j++) {
+        result.add(values[tmp[j]]);
+      }
+      resultMap.put(++resultMapSize, result);
+      result = new ArrayList<>();
+      return;
+    }
+    for (int i = start; i < values.length; i++) {
+      tmp[depth] = i;
+      generateCombinations(i + 1, depth + 1, tmp);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
index 8820400..f8f2d33 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
@@ -30,36 +30,46 @@ import com.google.common.collect.Maps;
  *
  * @since 0.3.5
  */
-public class DataTable<R,C,E>   {
+public class DataTable<R,C,E>
+{
 
-	//machineKey, [cpu,ram,hdd] -> value
-	private final Map<R,Map<C,E>> table= Maps.newHashMap();
+  //machineKey, [cpu,ram,hdd] -> value
+  private final Map<R,Map<C,E>> table = Maps.newHashMap();
 
-	public boolean containsRow(R rowKey){
-		return table.containsKey(rowKey);
-	}
+  public boolean containsRow(R rowKey)
+  {
+    return table.containsKey(rowKey);
+  }
 
-	public void put(R rowKey,C colKey, E entry){
-		if(!containsRow(rowKey)){
-			table.put(rowKey, Maps.<C,E>newHashMap());
-		}
-		table.get(rowKey).put(colKey, entry);
-	}
+  public void put(R rowKey,C colKey, E entry)
+  {
+    if (!containsRow(rowKey)) {
+      table.put(rowKey, Maps.<C,E>newHashMap());
+    }
+    table.get(rowKey).put(colKey, entry);
+  }
 
-	@Nullable public E get(R rowKey, C colKey){
-		if(!containsRow(rowKey)) return null;
-		return table.get(rowKey).get(colKey);
-	}
+  @Nullable
+  public E get(R rowKey, C colKey)
+  {
+    if (!containsRow(rowKey)) {
+      return null;
+    }
+    return table.get(rowKey).get(colKey);
+  }
 
-	public Set<R> rowKeySet(){
-		return table.keySet();
-	}
+  public Set<R> rowKeySet()
+  {
+    return table.keySet();
+  }
 
-	public void clear(){
-		table.clear();
-	}
+  public void clear()
+  {
+    table.clear();
+  }
 
-	public Map<R,Map<C,E>> getTable(){
-		return table;
-	}
+  public Map<R,Map<C,E>> getTable()
+  {
+    return table;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
index 1a26bd1..0e397be 100644
--- a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
+++ b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
@@ -18,6 +18,20 @@
  */
 package com.datatorrent.demos.machinedata;
 
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
 import com.datatorrent.demos.machinedata.data.MachineInfo;
 import com.datatorrent.demos.machinedata.data.MachineKey;
 import com.datatorrent.demos.machinedata.data.ResourceType;
@@ -26,20 +40,6 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.TimeBucketKey;
 
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
 /**
  * @since 0.3.5
  */
@@ -94,7 +94,7 @@ public class CalculatorOperatorTest
     Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
     for (Object o : sortSink.collectedTuples) {
       LOG.debug(o.toString());
-      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o;
+      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
       Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0);
       Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0);
       Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0);
@@ -132,7 +132,7 @@ public class CalculatorOperatorTest
     Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
     for (Object o : sortSink.collectedTuples) {
       LOG.debug(o.toString());
-      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o;
+      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
       Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0);
       Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0);
       Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0);
@@ -184,7 +184,7 @@ public class CalculatorOperatorTest
     Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
     for (Object o : sortSink.collectedTuples) {
       LOG.debug(o.toString());
-      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o;
+      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
       Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0);
       Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0);
       Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
index 9d9f31b..30d7281 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
@@ -18,6 +18,18 @@
  */
 package com.datatorrent.demos.mobile;
 
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StatsListener;
@@ -28,16 +40,6 @@ import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
 import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.commons.lang3.Range;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Random;
 
 /**
  * Mobile Demo Application:

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
index 3b1e49d..8964d84 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
@@ -18,18 +18,20 @@
  */
 package com.datatorrent.demos.mobile;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
+import java.util.Map;
+import java.util.Random;
+import javax.validation.constraints.Min;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.validation.constraints.Min;
-import java.util.Map;
-import java.util.Random;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Generates mobile numbers that will be displayed in mobile demo just after launch.<br></br>
@@ -99,7 +101,8 @@ public class PhoneEntryOperator extends BaseOperator
   public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>();
 
   @Override
-  public void beginWindow(long windowId){
+  public void beginWindow(long windowId)
+  {
     if (!seedGenerationDone) {
       Random random = new Random();
       int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
index 8db74cd..a46e6d4 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
@@ -25,20 +25,20 @@ import java.util.Set;
 
 import javax.validation.constraints.Min;
 
-import org.apache.commons.lang.mutable.MutableLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.mutable.MutableLong;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.counters.BasicCounters;
 import com.datatorrent.lib.util.HighLow;
 
@@ -73,8 +73,7 @@ public class PhoneMovementGenerator extends BaseOperator
       if (delta >= threshold) {
         if (state < 2) {
           xloc++;
-        }
-        else {
+        } else {
           xloc--;
         }
         if (xloc < 0) {
@@ -85,8 +84,7 @@ public class PhoneMovementGenerator extends BaseOperator
       if (delta >= threshold) {
         if ((state == 1) || (state == 3)) {
           yloc++;
-        }
-        else {
+        } else {
           yloc--;
         }
         if (yloc < 0) {
@@ -100,8 +98,7 @@ public class PhoneMovementGenerator extends BaseOperator
       HighLow<Integer> nloc = newgps.get(tuple);
       if (nloc == null) {
         newgps.put(tuple, new HighLow<Integer>(xloc, yloc));
-      }
-      else {
+      } else {
         nloc.setHigh(xloc);
         nloc.setLow(yloc);
       }
@@ -109,7 +106,7 @@ public class PhoneMovementGenerator extends BaseOperator
     }
   };
 
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>()
   {
     @Override
@@ -120,19 +117,16 @@ public class PhoneMovementGenerator extends BaseOperator
       if (command != null) {
         if (command.equals(COMMAND_ADD)) {
           commandCounters.getCounter(CommandCounters.ADD).increment();
-          String phoneStr= tuple.get(KEY_PHONE);
+          String phoneStr = tuple.get(KEY_PHONE);
           registerPhone(phoneStr);
-        }
-        else if (command.equals(COMMAND_ADD_RANGE)) {
+        } else if (command.equals(COMMAND_ADD_RANGE)) {
           commandCounters.getCounter(CommandCounters.ADD_RANGE).increment();
           registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE));
-        }
-        else if (command.equals(COMMAND_DELETE)) {
+        } else if (command.equals(COMMAND_DELETE)) {
           commandCounters.getCounter(CommandCounters.DELETE).increment();
-          String phoneStr= tuple.get(KEY_PHONE);
+          String phoneStr = tuple.get(KEY_PHONE);
           deregisterPhone(phoneStr);
-        }
-        else if (command.equals(COMMAND_CLEAR)) {
+        } else if (command.equals(COMMAND_CLEAR)) {
           commandCounters.getCounter(CommandCounters.CLEAR).increment();
           clearPhones();
         }
@@ -181,7 +175,7 @@ public class PhoneMovementGenerator extends BaseOperator
 
   /**
    * Sets the range of phone numbers for which the GPS locations need to be generated.
-   * 
+   *
    * @param i the range of phone numbers to set
    */
   public void setRange(int i)
@@ -190,7 +184,7 @@ public class PhoneMovementGenerator extends BaseOperator
   }
 
   /**
-   * @return the threshold 
+   * @return the threshold
    */
   @Min(0)
   public int getThreshold()
@@ -200,7 +194,7 @@ public class PhoneMovementGenerator extends BaseOperator
 
   /**
    * Sets the threshold that decides how frequently the GPS locations are updated.
-   * 
+   *
    * @param i the value that decides how frequently the GPS locations change.
    */
   public void setThreshold(int i)
@@ -217,8 +211,7 @@ public class PhoneMovementGenerator extends BaseOperator
     try {
       Integer phone = new Integer(phoneStr);
       registerSinglePhone(phone);
-    }
-    catch (NumberFormatException nfe) {
+    } catch (NumberFormatException nfe) {
       LOG.warn("Invalid no {}", phoneStr);
     }
   }
@@ -239,8 +232,7 @@ public class PhoneMovementGenerator extends BaseOperator
       for (int i = startPhone; i <= endPhone; i++) {
         registerSinglePhone(i);
       }
-    }
-    catch (NumberFormatException nfe) {
+    } catch (NumberFormatException nfe) {
       LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr);
     }
   }
@@ -265,13 +257,13 @@ public class PhoneMovementGenerator extends BaseOperator
         LOG.debug("Removing query id {}", phone);
         emitPhoneRemoved(phone);
       }
-    }
-    catch (NumberFormatException nfe) {
+    } catch (NumberFormatException nfe) {
       LOG.warn("Invalid phone {}", phoneStr);
     }
   }
 
-  private void clearPhones() {
+  private void clearPhones()
+  {
     phoneRegister.clear();
     LOG.info("Clearing phones");
   }
@@ -298,8 +290,7 @@ public class PhoneMovementGenerator extends BaseOperator
       HighLow<Integer> loc = gps.get(e.getKey());
       if (loc == null) {
         gps.put(e.getKey(), e.getValue());
-      }
-      else {
+      } else {
         loc.setHigh(e.getValue().getHigh());
         loc.setLow(e.getValue().getLow());
       }
@@ -316,7 +307,8 @@ public class PhoneMovementGenerator extends BaseOperator
     context.setCounters(commandCounters);
   }
 
-  private void emitQueryResult(Integer phone) {
+  private void emitQueryResult(Integer phone)
+  {
     HighLow<Integer> loc = gps.get(phone);
     if (loc != null) {
       Map<String, String> queryResult = new HashMap<String, String>();
@@ -328,7 +320,7 @@ public class PhoneMovementGenerator extends BaseOperator
 
   private void emitPhoneRemoved(Integer phone)
   {
-    Map<String,String> removedResult= Maps.newHashMap();
+    Map<String,String> removedResult = Maps.newHashMap();
     removedResult.put(KEY_PHONE, String.valueOf(phone));
     removedResult.put(KEY_REMOVED,"true");
     locationQueryResult.emit(removedResult);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
index 72d6514..87e40bf 100644
--- a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
+++ b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
@@ -35,13 +35,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.LocalMode;
+
 import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
 import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
-import com.datatorrent.api.LocalMode;
-
 public class ApplicationTest
 {
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
@@ -65,7 +65,7 @@ public class ApplicationTest
     contextHandler.addServlet(sh, "/pubsub");
     contextHandler.addServlet(sh, "/*");
     server.start();
-    Connector connector[] = server.getConnectors();
+    Connector[] connector = server.getConnectors();
     conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
     URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
 
@@ -111,7 +111,7 @@ public class ApplicationTest
     server.stop();
     Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5);
     for (Object obj : sink.collectedTuples) {
-      Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>) obj).get("phone"));
+      Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
index 245d9c4..5625439 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
@@ -20,20 +20,19 @@ package com.datatorrent.demos.mrmonitor;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.testbench.SeedEventGenerator;
-
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.SeedEventGenerator;
 
 /**
  * Application
  *
  * @since 2.0.0
  */
-@ApplicationAnnotation(name="MyFirstApplication")
+@ApplicationAnnotation(name = "MyFirstApplication")
 public class Application implements StreamingApplication
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
index 2f3d651..7930405 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
@@ -26,23 +26,23 @@ package com.datatorrent.demos.mrmonitor;
 public interface Constants
 {
 
-  public final static int MAX_NUMBER_OF_JOBS = 25;
+  public static final int MAX_NUMBER_OF_JOBS = 25;
 
-  public final static String REDUCE_TASK_TYPE = "REDUCE";
-  public final static String MAP_TASK_TYPE = "MAP";
-  public final static String TASK_TYPE = "type";
-  public final static String TASK_ID = "id";
+  public static final String REDUCE_TASK_TYPE = "REDUCE";
+  public static final String MAP_TASK_TYPE = "MAP";
+  public static final String TASK_TYPE = "type";
+  public static final String TASK_ID = "id";
 
-  public final static String LEAGACY_TASK_ID = "taskId";
-  public final static int MAX_TASKS = 2000;
+  public static final String LEAGACY_TASK_ID = "taskId";
+  public static final int MAX_TASKS = 2000;
 
-  public final static String QUERY_APP_ID = "app_id";
-  public final static String QUERY_JOB_ID = "job_id";
-  public final static String QUERY_HADOOP_VERSION = "hadoop_version";
-  public final static String QUERY_API_VERSION = "api_version";
-  public final static String QUERY_RM_PORT = "rm_port";
-  public final static String QUERY_HS_PORT = "hs_port";
-  public final static String QUERY_HOST_NAME = "hostname";
+  public static final String QUERY_APP_ID = "app_id";
+  public static final String QUERY_JOB_ID = "job_id";
+  public static final String QUERY_HADOOP_VERSION = "hadoop_version";
+  public static final String QUERY_API_VERSION = "api_version";
+  public static final String QUERY_RM_PORT = "rm_port";
+  public static final String QUERY_HS_PORT = "hs_port";
+  public static final String QUERY_HOST_NAME = "hostname";
   public static final String QUERY_KEY_COMMAND = "command";
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
index 88863e2..263a1a7 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
@@ -18,7 +18,11 @@
  */
 package com.datatorrent.demos.mrmonitor;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.codehaus.jettison.json.JSONArray;
@@ -107,8 +111,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
           outputJsonObject.put("id", mrStatusObj.getJobId());
           outputJsonObject.put("removed", "true");
           output.emit(outputJsonObject.toString());
-        }
-        catch (JSONException e) {
+        } catch (JSONException e) {
           LOG.warn("Error creating JSON: {}", e.getMessage());
         }
         return;
@@ -123,8 +126,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
       }
       if (mrStatusObj.getHadoopVersion() == 2) {
         getJsonForJob(mrStatusObj);
-      }
-      else if (mrStatusObj.getHadoopVersion() == 1) {
+      } else if (mrStatusObj.getHadoopVersion() == 1) {
         getJsonForLegacyJob(mrStatusObj);
       }
       mrStatusObj.setStatusHistoryCount(statusHistoryTime);
@@ -204,8 +206,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
     if (jsonObj != null) {
       if (statusObj.getMetricObject() == null) {
         statusObj.setMetricObject(new TaskObject(jsonObj));
-      }
-      else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) {
+      } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) {
         statusObj.getMetricObject().setJson(jsonObj);
         statusObj.getMetricObject().setModified(true);
       }
@@ -252,8 +253,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
               continue;
             }
             reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
-          }
-          else {
+          } else {
             if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
               TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID));
               if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
@@ -269,8 +269,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
         }
         statusObj.setMapJsonObject(mapTaskOject);
         statusObj.setReduceJsonObject(reduceTaskOject);
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         LOG.info("exception: {}", e.getMessage());
       }
     }
@@ -324,12 +323,11 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
   {
     try {
       JSONObject jobJson = statusObj.getJsonObject();
-      int totalTasks = ((JSONObject) ((JSONObject) jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks");
+      int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks");
       Map<String, TaskObject> taskMap;
       if (type.equalsIgnoreCase("map")) {
         taskMap = statusObj.getMapJsonObject();
-      }
-      else {
+      } else {
         taskMap = statusObj.getReduceJsonObject();
       }
 
@@ -371,12 +369,10 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
 
       if (type.equalsIgnoreCase("map")) {
         statusObj.setMapJsonObject(taskMap);
-      }
-      else {
+      } else {
         statusObj.setReduceJsonObject(taskMap);
       }
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.info(e.getMessage());
     }
 
@@ -387,8 +383,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
   {
     try {
       Thread.sleep(sleepTime);//
-    }
-    catch (InterruptedException ie) {
+    } catch (InterruptedException ie) {
       // If this thread was intrrupted by nother thread
     }
     if (!iterator.hasNext()) {
@@ -399,8 +394,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
       MRStatusObject obj = iterator.next();
       if (obj.getHadoopVersion() == 2) {
         getJsonForJob(obj);
-      }
-      else if (obj.getHadoopVersion() == 1) {
+      } else if (obj.getHadoopVersion() == 1) {
         getJsonForLegacyJob(obj);
       }
     }
@@ -465,8 +459,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
       outputJsonObject.put("tasks", arr);
       reduceOutput.emit(outputJsonObject.toString());
       obj.setRetrials(0);
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.warn("error creating json {}", e.getMessage());
     }
 
@@ -543,17 +536,14 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
         if (!modified) {
           if (obj.getRetrials() >= maxRetrials) {
             delList.add(obj.getJobId());
-          }
-          else {
+          } else {
             obj.setRetrials(obj.getRetrials() + 1);
           }
-        }
-        else {
+        } else {
           obj.setRetrials(0);
         }
       }
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       LOG.warn("error creating json {}", ex.getMessage());
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
index 5758ad1..037378a 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
@@ -20,10 +20,11 @@ package com.datatorrent.demos.mrmonitor;
 
 import java.net.URI;
 
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
index f0471f3..481f3dc 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
@@ -149,7 +149,8 @@ public class MRStatusObject
     virtualMemoryStatusHistory = new LinkedList<String>();
     cpuStatusHistory = new LinkedList<String>();
     statusScheduler = Executors.newScheduledThreadPool(1);
-    statusScheduler.scheduleAtFixedRate(new Runnable() {
+    statusScheduler.scheduleAtFixedRate(new Runnable()
+    {
       @Override
       public void run()
       {
@@ -333,12 +334,15 @@ public class MRStatusObject
   @Override
   public boolean equals(Object that)
   {
-    if (this == that)
+    if (this == that) {
       return true;
-    if (!(that instanceof MRStatusObject))
+    }
+    if (!(that instanceof MRStatusObject)) {
       return false;
-    if (this.hashCode() == that.hashCode())
+    }
+    if (this.hashCode() == that.hashCode()) {
       return true;
+    }
     return false;
   }
 
@@ -443,7 +447,7 @@ public class MRStatusObject
 
     /**
      * This returns the task information as json
-     * 
+     *
      * @return
      */
     public JSONObject getJson()
@@ -453,7 +457,7 @@ public class MRStatusObject
 
     /**
      * This stores the task information as json
-     * 
+     *
      * @param json
      */
     public void setJson(JSONObject json)
@@ -463,7 +467,7 @@ public class MRStatusObject
 
     /**
      * This returns if the json object has been modified
-     * 
+     *
      * @return
      */
     public boolean isModified()
@@ -473,7 +477,7 @@ public class MRStatusObject
 
     /**
      * This sets if the json object is modified
-     * 
+     *
      * @param modified
      */
     public void setModified(boolean modified)
@@ -483,7 +487,7 @@ public class MRStatusObject
 
     /**
      * This returns the string format of the json object
-     * 
+     *
      * @return
      */
     public String getJsonString()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
index cb10347..0d7f6af 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
@@ -20,15 +20,16 @@ package com.datatorrent.demos.mrmonitor;
 
 import java.io.IOException;
 
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.DefaultHttpClient;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * <p>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
index e37454f..5075163 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
@@ -33,7 +33,8 @@ import com.datatorrent.api.Operator;
 public class MapToMRObjectOperator implements Operator
 {
 
-  public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>() {
+  public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>()
+  {
     @Override
     public void process(Map<String, String> tuple)
     {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
index 70cf840..ad8de02 100644
--- a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
+++ b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
@@ -28,9 +28,8 @@ import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
-
 import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
 
 /**
  * <p>MapReduceDebuggerApplicationTest class.</p>
@@ -53,7 +52,7 @@ public class MrMonitoringApplicationTest
     contextHandler.addServlet(sh, "/pubsub");
     contextHandler.addServlet(sh, "/*");
     server.start();
-    Connector connector[] = server.getConnectors();
+    Connector[] connector = server.getConnectors();
     conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
 
     MRMonitoringApplication application = new MRMonitoringApplication();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
index 8ecf76e..5dbd83f 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
@@ -18,11 +18,11 @@
  */
 package com.datatorrent.demos.mroperator;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 
 import org.apache.hadoop.io.WritableComparable;
 
@@ -33,44 +33,48 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public class DateWritable implements WritableComparable<DateWritable>
 {
-	private final static SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" );
-	private Date date;
+  private static final SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" );
+  private Date date;
+
+  public Date getDate()
+  {
+    return date;
+  }
+
+  public void setDate( Date date )
+  {
+    this.date = date;
+  }
 
-	public Date getDate()
-	{
-		return date;
-	}
+  public void readFields( DataInput in ) throws IOException
+  {
+    date = new Date( in.readLong() );
+  }
 
-	public void setDate( Date date )
-	{
-		this.date = date;
-	}
+  public void write( DataOutput out ) throws IOException
+  {
+    out.writeLong( date.getTime() );
+  }
 
-	public void readFields( DataInput in ) throws IOException
-	{
-		date = new Date( in.readLong() );
-	}
+  @Override
+  public boolean equals(Object o)
+  {
+    return toString().equals(o.toString());
+  }
 
-	public void write( DataOutput out ) throws IOException
-	{
-		out.writeLong( date.getTime() );
-	}
+  @Override
+  public int hashCode()
+  {
+    return toString().hashCode();
+  }
 
-	@Override
-	public boolean equals(Object o){
-		return toString().equals(o.toString());
-	}
-	@Override
-	public int hashCode(){
-		return toString().hashCode();
-	}
-	public String toString()
-	{
-		return formatter.format( date);
-	}
+  public String toString()
+  {
+    return formatter.format( date);
+  }
 
-    public int compareTo( DateWritable other )
-    {
-        return date.compareTo( other.getDate() );
-    }
+  public int compareTo( DateWritable other )
+  {
+    return date.compareTo( other.getDate() );
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
index b6b9735..c4b9c49 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
@@ -36,6 +36,6 @@ public class HdfsKeyValOutputOperator<K, V> extends AbstractSingleFileOutputOper
   @Override
   public byte[] getBytesForTuple(KeyHashValPair<K,V> t)
   {
-    return (t.toString()+"\n").getBytes();
+    return (t.toString() + "\n").getBytes();
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
index dae07a2..076b8ac 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * @since 0.9.0
  */
-@ApplicationAnnotation(name="InvertedIndexDemo")
+@ApplicationAnnotation(name = "InvertedIndexDemo")
 public class InvertedIndexApplication extends MapReduceApplication<LongWritable, Text, Text, Text>
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
index aabea81..e963954 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
@@ -41,18 +41,18 @@ import org.apache.hadoop.mapred.Reporter;
  *
  * @since 0.9.0
  */
-public class LineIndexer {
+public class LineIndexer
+{
 
   public static class LineIndexMapper extends MapReduceBase
-      implements Mapper<LongWritable, Text, Text, Text> {
-
-    private final static Text word = new Text();
-    private final static Text location = new Text();
+      implements Mapper<LongWritable, Text, Text, Text>
+  {
+    private static final Text word = new Text();
+    private static final Text location = new Text();
 
     public void map(LongWritable key, Text val,
-        OutputCollector<Text, Text> output, Reporter reporter)
-        throws IOException {
-
+        OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+    {
       FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
       String fileName = fileSplit.getPath().getName();
       location.set(fileName);
@@ -69,18 +69,18 @@ public class LineIndexer {
 
 
   public static class LineIndexReducer extends MapReduceBase
-      implements Reducer<Text, Text, Text, Text> {
-
+      implements Reducer<Text, Text, Text, Text>
+  {
     public void reduce(Text key, Iterator<Text> values,
-        OutputCollector<Text, Text> output, Reporter reporter)
-        throws IOException {
-
+        OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+    {
       boolean first = true;
       StringBuilder toReturn = new StringBuilder();
-      while (values.hasNext()){
-        if (!first)
+      while (values.hasNext()) {
+        if (!first) {
           toReturn.append(", ");
-        first=false;
+        }
+        first = false;
         toReturn.append(values.next().toString());
       }
 
@@ -93,7 +93,8 @@ public class LineIndexer {
    * The actual main() method for our program; this is the
    * "driver" for the MapReduce job.
    */
-  public static void main(String[] args) {
+  public static void main(String[] args)
+  {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(LineIndexer.class);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
index 793ad4d..69ee892 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
@@ -18,201 +18,170 @@
  */
 package com.datatorrent.demos.mroperator;
 
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Iterator;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Iterator;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * <p>LogCountsPerHour class.</p>
  *
  * @since 0.9.0
  */
-public class LogCountsPerHour extends Configured implements Tool {
+public class LogCountsPerHour extends Configured implements Tool
+{
 
-    public static class LogMapClass extends MapReduceBase
-            implements Mapper<LongWritable, Text, DateWritable, IntWritable>
-    {
-        private DateWritable date = new DateWritable();
-        private final static IntWritable one = new IntWritable( 1 );
-
-        public void map( LongWritable key, // Offset into the file
-                         Text value,
-                         OutputCollector<DateWritable, IntWritable> output,
-                         Reporter reporter) throws IOException
-        {
-            // Get the value as a String; it is of the format:
-        	// 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
-            String text = value.toString();
-
-            // Get the date and time
-            int openBracket = text.indexOf( '[' );
-            int closeBracket = text.indexOf( ']' );
-            if( openBracket != -1 && closeBracket != -1 )
-            {
-            	// Read the date
-            	String dateString = text.substring( text.indexOf( '[' ) + 1, text.indexOf( ']' ) );
-
-            	// Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
-                int index = 0;
-                int nextIndex = dateString.indexOf( '/' );
-                int day = Integer.parseInt( dateString.substring(index, nextIndex) );
-
-                index = nextIndex;
-                nextIndex = dateString.indexOf( '/', index+1 );
-                String month = dateString.substring( index+1, nextIndex );
-
-                index = nextIndex;
-                nextIndex = dateString.indexOf( ':', index );
-                int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
-
-                index = nextIndex;
-                nextIndex = dateString.indexOf( ':', index+1 );
-                int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
-
-                // Build a calendar object for this date
-                Calendar calendar = Calendar.getInstance();
-                calendar.set( Calendar.DATE, day );
-                calendar.set( Calendar.YEAR, year );
-                calendar.set( Calendar.HOUR, hour );
-                calendar.set( Calendar.MINUTE, 0 );
-                calendar.set( Calendar.SECOND, 0 );
-                calendar.set( Calendar.MILLISECOND, 0 );
-
-                if( month.equalsIgnoreCase( "dec" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.DECEMBER );
-                }
-                else if( month.equalsIgnoreCase( "nov" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.NOVEMBER );
-                }
-                else if( month.equalsIgnoreCase( "oct" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.OCTOBER );
-                }
-                else if( month.equalsIgnoreCase( "sep" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.SEPTEMBER );
-                }
-                else if( month.equalsIgnoreCase( "aug" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.AUGUST );
-                }
-                else if( month.equalsIgnoreCase( "jul" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.JULY );
-                }
-                else if( month.equalsIgnoreCase( "jun" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.JUNE );
-                }
-                else if( month.equalsIgnoreCase( "may" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.MAY );
-                }
-                else if( month.equalsIgnoreCase( "apr" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.APRIL );
-                }
-                else if( month.equalsIgnoreCase( "mar" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.MARCH );
-                }
-                else if( month.equalsIgnoreCase( "feb" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.FEBRUARY );
-                }
-                else if( month.equalsIgnoreCase( "jan" ) )
-                {
-                    calendar.set( Calendar.MONTH, Calendar.JANUARY );
-                }
-
-
-                // Output the date as the key and 1 as the value
-                date.setDate( calendar.getTime() );
-                output.collect(date, one);
-            }
-        }
-    }
+  public static class LogMapClass extends MapReduceBase
+      implements Mapper<LongWritable, Text, DateWritable, IntWritable>
+  {
+    private DateWritable date = new DateWritable();
+    private static final IntWritable one = new IntWritable(1);
 
-    public static class LogReduce extends MapReduceBase
-            implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
+    public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
     {
-        public void reduce( DateWritable key, Iterator<IntWritable> values,
-                            OutputCollector<DateWritable, IntWritable> output,
-                            Reporter reporter) throws IOException
-        {
-            // Iterate over all of the values (counts of occurrences of this word)
-            int count = 0;
-            while( values.hasNext() )
-            {
-                // Add the value to our count
-                count += values.next().get();
-            }
-
-            // Output the word with its count (wrapped in an IntWritable)
-            output.collect( key, new IntWritable( count ) );
+      // Get the value as a String; it is of the format:
+      // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
+      String text = value.toString();
+
+      // Get the date and time
+      int openBracket = text.indexOf('[');
+      int closeBracket = text.indexOf(']');
+      if (openBracket != -1 && closeBracket != -1) {
+        // Read the date
+        String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']'));
+
+        // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
+        int index = 0;
+        int nextIndex = dateString.indexOf('/');
+        int day = Integer.parseInt(dateString.substring(index, nextIndex));
+
+        index = nextIndex;
+        nextIndex = dateString.indexOf('/', index + 1);
+        String month = dateString.substring(index + 1, nextIndex);
+
+        index = nextIndex;
+        nextIndex = dateString.indexOf(':', index);
+        int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+        index = nextIndex;
+        nextIndex = dateString.indexOf(':', index + 1);
+        int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+        // Build a calendar object for this date
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(Calendar.DATE, day);
+        calendar.set(Calendar.YEAR, year);
+        calendar.set(Calendar.HOUR, hour);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+
+        if (month.equalsIgnoreCase("dec")) {
+          calendar.set(Calendar.MONTH, Calendar.DECEMBER);
+        } else if (month.equalsIgnoreCase("nov")) {
+          calendar.set(Calendar.MONTH, Calendar.NOVEMBER);
+        } else if (month.equalsIgnoreCase("oct")) {
+          calendar.set(Calendar.MONTH, Calendar.OCTOBER);
+        } else if (month.equalsIgnoreCase("sep")) {
+          calendar.set(Calendar.MONTH, Calendar.SEPTEMBER);
+        } else if (month.equalsIgnoreCase("aug")) {
+          calendar.set(Calendar.MONTH, Calendar.AUGUST);
+        } else if (month.equalsIgnoreCase("jul")) {
+          calendar.set(Calendar.MONTH, Calendar.JULY);
+        } else if (month.equalsIgnoreCase("jun")) {
+          calendar.set(Calendar.MONTH, Calendar.JUNE);
+        } else if (month.equalsIgnoreCase("may")) {
+          calendar.set(Calendar.MONTH, Calendar.MAY);
+        } else if (month.equalsIgnoreCase("apr")) {
+          calendar.set(Calendar.MONTH, Calendar.APRIL);
+        } else if (month.equalsIgnoreCase("mar")) {
+          calendar.set(Calendar.MONTH, Calendar.MARCH);
+        } else if (month.equalsIgnoreCase("feb")) {
+          calendar.set(Calendar.MONTH, Calendar.FEBRUARY);
+        } else if (month.equalsIgnoreCase("jan")) {
+          calendar.set(Calendar.MONTH, Calendar.JANUARY);
         }
-    }
 
 
-    public int run(String[] args) throws Exception
-    {
-        // Create a configuration
-        Configuration conf = getConf();
-
-        // Create a job from the default configuration that will use the WordCount class
-        JobConf job = new JobConf( conf, LogCountsPerHour.class );
-
-        // Define our input path as the first command line argument and our output path as the second
-        Path in = new Path( args[0] );
-        Path out = new Path( args[1] );
-
-        // Create File Input/Output formats for these paths (in the job)
-        FileInputFormat.setInputPaths( job, in );
-        FileOutputFormat.setOutputPath( job, out );
-
-        // Configure the job: name, mapper, reducer, and combiner
-        job.setJobName( "LogAveragePerHour" );
-        job.setMapperClass( LogMapClass.class );
-        job.setReducerClass( LogReduce.class );
-        job.setCombinerClass( LogReduce.class );
-
-        // Configure the output
-        job.setOutputFormat( TextOutputFormat.class );
-        job.setOutputKeyClass( DateWritable.class );
-        job.setOutputValueClass( IntWritable.class );
-
-        // Run the job
-        JobClient.runJob(job);
-        return 0;
+        // Output the date as the key and 1 as the value
+        date.setDate(calendar.getTime());
+        output.collect(date, one);
+      }
     }
+  }
 
-    public static void main(String[] args) throws Exception
+  public static class LogReduce extends MapReduceBase
+      implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
+  {
+    public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
     {
-        // Start the LogCountsPerHour MapReduce application
-        int res = ToolRunner.run( new Configuration(),
-                new LogCountsPerHour(),
-                args );
-        System.exit( res );
+      // Iterate over all of the values (counts of occurrences of this word)
+      int count = 0;
+      while (values.hasNext()) {
+        // Add the value to our count
+        count += values.next().get();
+      }
+
+      // Output the word with its count (wrapped in an IntWritable)
+      output.collect(key, new IntWritable(count));
     }
+  }
+
+
+  public int run(String[] args) throws Exception
+  {
+    // Create a configuration
+    Configuration conf = getConf();
+
+    // Create a job from the default configuration that will use the WordCount class
+    JobConf job = new JobConf(conf, LogCountsPerHour.class);
+
+    // Define our input path as the first command line argument and our output path as the second
+    Path in = new Path(args[0]);
+    Path out = new Path(args[1]);
+
+    // Create File Input/Output formats for these paths (in the job)
+    FileInputFormat.setInputPaths(job, in);
+    FileOutputFormat.setOutputPath(job, out);
+
+    // Configure the job: name, mapper, reducer, and combiner
+    job.setJobName("LogAveragePerHour");
+    job.setMapperClass(LogMapClass.class);
+    job.setReducerClass(LogReduce.class);
+    job.setCombinerClass(LogReduce.class);
+
+    // Configure the output
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputKeyClass(DateWritable.class);
+    job.setOutputValueClass(IntWritable.class);
+
+    // Run the job
+    JobClient.runJob(job);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    // Start the LogCountsPerHour MapReduce application
+    int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args);
+    System.exit(res);
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
index cbe5566..2d647ed 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
@@ -30,7 +30,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * @since 0.9.0
  */
-@ApplicationAnnotation(name="LogsCountDemo")
+@ApplicationAnnotation(name = "LogsCountDemo")
 public class LogsCountApplication extends MapReduceApplication<LongWritable, Text, DateWritable, IntWritable>
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
index b8023f5..509f6ae 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
@@ -22,18 +22,35 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import javax.validation.constraints.Min;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
@@ -123,8 +140,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
     if (reader == null) {
       try {
         reader = inputFormat.getRecordReader(inputSplit, new JobConf(new Configuration()), reporter);
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         logger.info("error getting record reader {}", e.getMessage());
       }
     }
@@ -150,11 +166,10 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
       SerializationFactory serializationFactory = new SerializationFactory(conf);
       Deserializer keyDesiralizer = serializationFactory.getDeserializer(inputSplitClass);
       keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray()));
-      inputSplit = (InputSplit) keyDesiralizer.deserialize(null);
-      ((ReporterImpl) reporter).setInputSplit(inputSplit);
+      inputSplit = (InputSplit)keyDesiralizer.deserialize(null);
+      ((ReporterImpl)reporter).setInputSplit(inputSplit);
       reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), reporter);
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       logger.info("failed to initialize inputformat obj {}", inputFormat);
       throw new RuntimeException(e);
     }
@@ -172,8 +187,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
     if (mapClass != null) {
       try {
         mapObject = mapClass.newInstance();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         logger.info("can't instantiate object {}", e.getMessage());
       }
 
@@ -182,8 +196,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
     if (combineClass != null) {
       try {
         combineObject = combineClass.newInstance();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         logger.info("can't instantiate object {}", e.getMessage());
       }
       combineObject.configure(jobConf);
@@ -202,15 +215,14 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
           KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, val);
           mapObject.map(keyValue.getKey(), keyValue.getValue(), outputCollector, reporter);
           if (combineObject == null) {
-            List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList();
+            List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
             for (KeyHashValPair<K2, V2> e : list) {
               output.emit(e);
             }
             list.clear();
           }
         }
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         logger.debug(ex.toString());
         throw new RuntimeException(ex);
       }
@@ -220,7 +232,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
   @Override
   public void endWindow()
   {
-    List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList();
+    List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
     if (combineObject != null) {
       Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>();
       for (KeyHashValPair<K2, V2> tuple : list) {
@@ -229,8 +241,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
           cacheList = new ArrayList<V2>();
           cacheList.add(tuple.getValue());
           cacheObject.put(tuple.getKey(), cacheList);
-        }
-        else {
+        } else {
           cacheList.add(tuple.getValue());
         }
       }
@@ -239,12 +250,11 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
       for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) {
         try {
           combineObject.reduce(e.getKey(), e.getValue().iterator(), tempOutputCollector, reporter);
-        }
-        catch (IOException e1) {
+        } catch (IOException e1) {
           logger.info(e1.getMessage());
         }
       }
-      list = ((OutputCollectorImpl<K2, V2>) tempOutputCollector).getList();
+      list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList();
       for (KeyHashValPair<K2, V2> e : list) {
         output.emit(e);
       }
@@ -261,14 +271,13 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
   {
     FileInputFormat.setInputPaths(conf, new Path(path));
     if (inputFormat == null) {
-        inputFormat = inputFormatClass.newInstance();
-        String inputFormatClassName = inputFormatClass.getName();
-        if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
-          ((TextInputFormat) inputFormat).configure(conf);
-        }
-        else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) {
-          ((KeyValueTextInputFormat) inputFormat).configure(conf);
-        }
+      inputFormat = inputFormatClass.newInstance();
+      String inputFormatClassName = inputFormatClass.getName();
+      if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
+        ((TextInputFormat)inputFormat).configure(conf);
+      } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) {
+        ((KeyValueTextInputFormat)inputFormat).configure(conf);
+      }
     }
     return inputFormat.getSplits(conf, numSplits);
     // return null;
@@ -296,8 +305,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
       InputSplit[] splits;
       try {
         splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName());
-      }
-      catch (Exception e1) {
+      } catch (Exception e1) {
         logger.info(" can't get splits {}", e1.getMessage());
         throw new RuntimeException(e1);
       }
@@ -316,8 +324,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
           keySerializer.open(opr.getOutstream());
           keySerializer.serialize(splits[size - 1]);
           opr.setInputSplitClass(splits[size - 1].getClass());
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
           logger.info("error while serializing {}", e.getMessage());
         }
         size--;
@@ -333,8 +340,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
           keySerializer.open(opr.getOutstream());
           keySerializer.serialize(splits[size - 1]);
           opr.setInputSplitClass(splits[size - 1].getClass());
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
           logger.info("error while serializing {}", e.getMessage());
         }
         size--;
@@ -342,8 +348,7 @@ public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<
       }
       try {
         keySerializer.close();
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
       return operList;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
index 45f9005..b0ea7d8 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
@@ -30,15 +30,15 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * @since 0.9.0
  */
-@ApplicationAnnotation(name="WordCountDemo")
-public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable> {
-
-	public void NewWordCountApplication() {
-		setMapClass(WordCount.Map.class);
-		setReduceClass(WordCount.Reduce.class);
-		setCombineClass(WordCount.Reduce.class);
-		setInputFormat(TextInputFormat.class);
-
-	}
+@ApplicationAnnotation(name = "WordCountDemo")
+public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable>
+{
 
+  public void NewWordCountApplication()
+  {
+    setMapClass(WordCount.Map.class);
+    setReduceClass(WordCount.Reduce.class);
+    setCombineClass(WordCount.Reduce.class);
+    setInputFormat(TextInputFormat.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
index b380553..6c81724 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
@@ -24,14 +24,14 @@ import java.io.PipedOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.datatorrent.lib.util.KeyHashValPair;
 
 /**
@@ -40,50 +40,55 @@ import com.datatorrent.lib.util.KeyHashValPair;
  * @since 0.9.0
  */
 @SuppressWarnings("unchecked")
-public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V> {
-	private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class);
+public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V>
+{
+  private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class);
 
-	private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>();
+  private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>();
 
-	public List<KeyHashValPair<K, V>> getList() {
-		return list;
-	}
+  public List<KeyHashValPair<K, V>> getList()
+  {
+    return list;
+  }
 
-	private transient SerializationFactory serializationFactory;
-	private transient Configuration conf = null;
+  private transient SerializationFactory serializationFactory;
+  private transient Configuration conf = null;
 
-	public OutputCollectorImpl() {
-		conf = new Configuration();
-		serializationFactory = new SerializationFactory(conf);
+  public OutputCollectorImpl()
+  {
+    conf = new Configuration();
+    serializationFactory = new SerializationFactory(conf);
 
-	}
+  }
 
-	private <T> T cloneObj(T t) throws IOException {
-		Serializer<T> keySerializer;
-		Class<T> keyClass;
-		PipedInputStream pis = new PipedInputStream();
-		PipedOutputStream pos = new PipedOutputStream(pis);
-		keyClass = (Class<T>) t.getClass();
-		keySerializer = serializationFactory.getSerializer(keyClass);
-		keySerializer.open(pos);
-		keySerializer.serialize(t);
-		Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass);
-		keyDesiralizer.open(pis);
-		T clonedArg0 = keyDesiralizer.deserialize(null);
-		pos.close();
-		pis.close();
-		keySerializer.close();
-		keyDesiralizer.close();
-		return clonedArg0;
+  private <T> T cloneObj(T t) throws IOException
+  {
+    Serializer<T> keySerializer;
+    Class<T> keyClass;
+    PipedInputStream pis = new PipedInputStream();
+    PipedOutputStream pos = new PipedOutputStream(pis);
+    keyClass = (Class<T>)t.getClass();
+    keySerializer = serializationFactory.getSerializer(keyClass);
+    keySerializer.open(pos);
+    keySerializer.serialize(t);
+    Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass);
+    keyDesiralizer.open(pis);
+    T clonedArg0 = keyDesiralizer.deserialize(null);
+    pos.close();
+    pis.close();
+    keySerializer.close();
+    keyDesiralizer.close();
+    return clonedArg0;
 
-	}
+  }
 
-	@Override
-	public void collect(K arg0, V arg1) throws IOException {
-		if (conf == null) {
-			conf = new Configuration();
-			serializationFactory = new SerializationFactory(conf);
-		}
-		list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1)));
-	}
+  @Override
+  public void collect(K arg0, V arg1) throws IOException
+  {
+    if (conf == null) {
+      conf = new Configuration();
+      serializationFactory = new SerializationFactory(conf);
+    }
+    list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1)));
+  }
 }