You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:03 UTC

[13/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java b/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
index c114bad..a10fe95 100644
--- a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
@@ -23,11 +23,11 @@ import java.util.Map;
 
 import javax.validation.constraints.Min;
 
-import com.datatorrent.api.annotation.OperatorAnnotation;
 import org.apache.commons.lang.mutable.MutableDouble;
 
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
 
 /**
@@ -63,175 +63,175 @@ import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
  */
 @OperatorAnnotation(partitionable = false)
 public class QuotientMap<K, V extends Number> extends
-		BaseNumberKeyValueOperator<K, V>
+    BaseNumberKeyValueOperator<K, V>
 {
-	/**
-	 * Numerator key/sum value map.
-	 */
-	protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
-
-	/**
-	 * Denominator key/sum value map.
-	 */
-	protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
-
-	/**
-	 * Count occurrence of keys if set to true.
-	 */
-	boolean countkey = false;
-
-	/**
-	 * Quotient multiply by value.
-	 */
-	int mult_by = 1;
-
-	/**
-	 * Numerator input port.
-	 */
-	public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
-	{
-		/**
-		 * Added tuple to the numerator hash
-		 */
-		@Override
-		public void process(Map<K, V> tuple)
-		{
-			addTuple(tuple, numerators);
-		}
-	};
-
-	/**
-	 * Denominator input port.
-	 */
-	public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>()
-	{
-		/**
-		 * Added tuple to the denominator hash
-		 */
-		@Override
-		public void process(Map<K, V> tuple)
-		{
-			addTuple(tuple, denominators);
-		}
-	};
-
-	/**
-	 * Quotient output port.
-	 */
-	public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>();
-
-	/**
-	 * Add tuple to nval/dval map.
-	 * 
-	 * @param tuple
-	 *          key/value map on input port.
-	 * @param map
-	 *          key/summed value map.
-	 */
-	public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
-	{
-		for (Map.Entry<K, V> e : tuple.entrySet()) {
-			addEntry(e.getKey(), e.getValue(), map);
-		}
-	}
-
-	/**
-	 * Add/Update entry to key/sum value map.
-	 * 
-	 * @param key
-	 *          name.
-	 * @param value
-	 *          value for key.
-	 * @param map
-	 *          numerator/denominator key/sum map.
-	 */
-	public void addEntry(K key, V value, Map<K, MutableDouble> map)
-	{
-		if (!doprocessKey(key) || (value == null)) {
-			return;
-		}
-		MutableDouble val = map.get(key);
-		if (val == null) {
-			if (countkey) {
-				val = new MutableDouble(1.00);
-			} else {
-				val = new MutableDouble(value.doubleValue());
-			}
-		} else {
-			if (countkey) {
-				val.increment();
-			} else {
-				val.add(value.doubleValue());
-			}
-		}
-		map.put(cloneKey(key), val);
-	}
-
-	/**
-	 * getter for mult_by
-	 * 
-	 * @return mult_by
-	 */
-
-	@Min(0)
-	public int getMult_by()
-	{
-		return mult_by;
-	}
-
-	/**
-	 * getter for countkey
-	 * 
-	 * @return countkey
-	 */
-	public boolean getCountkey()
-	{
-		return countkey;
-	}
-
-	/**
-	 * Setter for mult_by
-	 * 
-	 * @param i
-	 */
-	public void setMult_by(int i)
-	{
-		mult_by = i;
-	}
-
-	/**
-	 * setter for countkey
-	 * 
-	 * @param i
-	 *          sets countkey
-	 */
-	public void setCountkey(boolean i)
-	{
-		countkey = i;
-	}
-
-	/**
-	 * Generates tuples for each key and emits them. Only keys that are in the
-	 * denominator are iterated on If the key is only in the numerator, it gets
-	 * ignored (cannot do divide by 0) Clears internal data
-	 */
-	@Override
-	public void endWindow()
-	{
-		HashMap<K, Double> tuples = new HashMap<K, Double>();
-		for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
-			MutableDouble nval = numerators.get(e.getKey());
-			if (nval == null) {
-				tuples.put(e.getKey(), new Double(0.0));
-			} else {
-				tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
-						.doubleValue()) * mult_by));
-			}
-		}
-		if (!tuples.isEmpty()) {
-			quotient.emit(tuples);
-		}
-		numerators.clear();
-		denominators.clear();
-	}
+  /**
+   * Numerator key/sum value map.
+   */
+  protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
+
+  /**
+   * Denominator key/sum value map.
+   */
+  protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
+
+  /**
+   * Count occurrence of keys if set to true.
+   */
+  boolean countkey = false;
+
+  /**
+   * Quotient multiply by value.
+   */
+  int mult_by = 1;
+
+  /**
+   * Numerator input port.
+   */
+  public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Added tuple to the numerator hash
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      addTuple(tuple, numerators);
+    }
+  };
+
+  /**
+   * Denominator input port.
+   */
+  public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Added tuple to the denominator hash
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      addTuple(tuple, denominators);
+    }
+  };
+
+  /**
+   * Quotient output port.
+   */
+  public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>();
+
+  /**
+   * Add tuple to nval/dval map.
+   *
+   * @param tuple
+   *          key/value map on input port.
+   * @param map
+   *          key/summed value map.
+   */
+  public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
+  {
+    for (Map.Entry<K, V> e : tuple.entrySet()) {
+      addEntry(e.getKey(), e.getValue(), map);
+    }
+  }
+
+  /**
+   * Add/Update entry to key/sum value map.
+   *
+   * @param key
+   *          name.
+   * @param value
+   *          value for key.
+   * @param map
+   *          numerator/denominator key/sum map.
+   */
+  public void addEntry(K key, V value, Map<K, MutableDouble> map)
+  {
+    if (!doprocessKey(key) || (value == null)) {
+      return;
+    }
+    MutableDouble val = map.get(key);
+    if (val == null) {
+      if (countkey) {
+        val = new MutableDouble(1.00);
+      } else {
+        val = new MutableDouble(value.doubleValue());
+      }
+    } else {
+      if (countkey) {
+        val.increment();
+      } else {
+        val.add(value.doubleValue());
+      }
+    }
+    map.put(cloneKey(key), val);
+  }
+
+  /**
+   * getter for mult_by
+   *
+   * @return mult_by
+   */
+
+  @Min(0)
+  public int getMult_by()
+  {
+    return mult_by;
+  }
+
+  /**
+   * getter for countkey
+   *
+   * @return countkey
+   */
+  public boolean getCountkey()
+  {
+    return countkey;
+  }
+
+  /**
+   * Setter for mult_by
+   *
+   * @param i
+   */
+  public void setMult_by(int i)
+  {
+    mult_by = i;
+  }
+
+  /**
+   * setter for countkey
+   *
+   * @param i
+   *          sets countkey
+   */
+  public void setCountkey(boolean i)
+  {
+    countkey = i;
+  }
+
+  /**
+   * Generates tuples for each key and emits them. Only keys that are in the
+   * denominator are iterated on If the key is only in the numerator, it gets
+   * ignored (cannot do divide by 0) Clears internal data
+   */
+  @Override
+  public void endWindow()
+  {
+    HashMap<K, Double> tuples = new HashMap<K, Double>();
+    for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
+      MutableDouble nval = numerators.get(e.getKey());
+      if (nval == null) {
+        tuples.put(e.getKey(), new Double(0.0));
+      } else {
+        tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
+            .doubleValue()) * mult_by));
+      }
+    }
+    if (!tuples.isEmpty()) {
+      quotient.emit(tuples);
+    }
+    numerators.clear();
+    denominators.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Range.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Range.java b/library/src/main/java/com/datatorrent/lib/math/Range.java
index 9a7ab7a..ad54d4d 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Range.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Range.java
@@ -43,62 +43,62 @@ import com.datatorrent.lib.util.UnifierRange;
  */
 public class Range<V extends Number> extends BaseNumberValueOperator<V>
 {
-	/**
-	 * Highest value on input port.
-	 */
-	protected V high = null;
+  /**
+   * Highest value on input port.
+   */
+  protected V high = null;
 
-	/**
-	 * Lowest value on input port.
-	 */
-	protected V low = null;
+  /**
+   * Lowest value on input port.
+   */
+  protected V low = null;
 
-	/**
-	 * Input data port.
-	 */
-	public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-	{
-		/**
-		 * Process each tuple to compute new high and low
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			if ((low == null) || (low.doubleValue() > tuple.doubleValue())) {
-				low = tuple;
-			}
+  /**
+   * Input data port.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each tuple to compute new high and low
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if ((low == null) || (low.doubleValue() > tuple.doubleValue())) {
+        low = tuple;
+      }
 
-			if ((high == null) || (high.doubleValue() < tuple.doubleValue())) {
-				high = tuple;
-			}
-		}
-	};
+      if ((high == null) || (high.doubleValue() < tuple.doubleValue())) {
+        high = tuple;
+      }
+    }
+  };
 
-	/**
-	 * Output range port, which emits high low unifier operator.
-	 */
-	public final transient DefaultOutputPort<HighLow<V>> range = new DefaultOutputPort<HighLow<V>>()
-	{
-		@Override
-		public Unifier<HighLow<V>> getUnifier()
-		{
-			return new UnifierRange<V>();
-		}
-	};
+  /**
+   * Output range port, which emits high low unifier operator.
+   */
+  public final transient DefaultOutputPort<HighLow<V>> range = new DefaultOutputPort<HighLow<V>>()
+  {
+    @Override
+    public Unifier<HighLow<V>> getUnifier()
+    {
+      return new UnifierRange<V>();
+    }
+  };
 
-	/**
-	 * Emits the range. If no tuple was received in the window, no emit is done
-	 * Clears the internal data before return
-	 */
-	@Override
-	public void endWindow()
-	{
-		if ((low != null) && (high != null)) {
-			HighLow tuple = new HighLow(getValue(high.doubleValue()),
-					getValue(low.doubleValue()));
-			range.emit(tuple);
-		}
-		high = null;
-		low = null;
-	}
+  /**
+   * Emits the range. If no tuple was received in the window, no emit is done
+   * Clears the internal data before return
+   */
+  @Override
+  public void endWindow()
+  {
+    if ((low != null) && (high != null)) {
+      HighLow tuple = new HighLow(getValue(high.doubleValue()),
+          getValue(low.doubleValue()));
+      range.emit(tuple);
+    }
+    high = null;
+    low = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
index a006415..241f482 100644
--- a/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
@@ -21,15 +21,14 @@ package com.datatorrent.lib.math;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamCodec;
 import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
 import com.datatorrent.lib.util.HighLow;
 import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.UnifierKeyValRange;
 
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.StreamCodec;
-
 /**
  *  This operator emits the range for each key at the end of window.
  * <p>
@@ -50,83 +49,82 @@ import com.datatorrent.api.StreamCodec;
  * @tags range, number, comparison, key value
  * @since 0.3.3
  */
-public class RangeKeyVal<K, V extends Number> extends
-		BaseNumberKeyValueOperator<K, V>
+public class RangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
 {
 
-	/**
-	 * key/high value map.
-	 */
-	protected HashMap<K, V> high = new HashMap<K, V>();
+  /**
+   * key/high value map.
+   */
+  protected HashMap<K, V> high = new HashMap<K, V>();
 
-	/**
-	 * key/low value map.
-	 */
-	protected HashMap<K, V> low = new HashMap<K, V>();
+  /**
+   * key/low value map.
+   */
+  protected HashMap<K, V> low = new HashMap<K, V>();
 
-	/**
-	 *  Input port that takes a key value pair.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
-	{
-		/**
-		 * Process each key and computes new high and low.
-		 */
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			K key = tuple.getKey();
-			if (!doprocessKey(key) || (tuple.getValue() == null)) {
-				return;
-			}
-			V val = low.get(key);
-			V eval = tuple.getValue();
-			if ((val == null) || (val.doubleValue() > eval.doubleValue())) {
-				low.put(cloneKey(key), eval);
-			}
+  /**
+   *  Input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Process each key and computes new high and low.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      if (!doprocessKey(key) || (tuple.getValue() == null)) {
+        return;
+      }
+      V val = low.get(key);
+      V eval = tuple.getValue();
+      if ((val == null) || (val.doubleValue() > eval.doubleValue())) {
+        low.put(cloneKey(key), eval);
+      }
 
-			val = high.get(key);
-			if ((val == null) || (val.doubleValue() < eval.doubleValue())) {
-				high.put(cloneKey(key), eval);
-			}
-		}
+      val = high.get(key);
+      if ((val == null) || (val.doubleValue() < eval.doubleValue())) {
+        high.put(cloneKey(key), eval);
+      }
+    }
 
-		@Override
-		public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-		{
-			return getKeyValPairStreamCodec();
-		}
-	};
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
 
-	/**
-	 * Range output port to send out the high low range.
-	 */
-	public final transient DefaultOutputPort<KeyValPair<K, HighLow<V>>> range = new DefaultOutputPort<KeyValPair<K, HighLow<V>>>()
-	{
-		@Override
-		public Unifier<KeyValPair<K, HighLow<V>>> getUnifier()
-		{
-			return new UnifierKeyValRange<K,V>();
-		}
-	};
+  /**
+   * Range output port to send out the high low range.
+   */
+  public final transient DefaultOutputPort<KeyValPair<K, HighLow<V>>> range = new DefaultOutputPort<KeyValPair<K, HighLow<V>>>()
+  {
+    @Override
+    public Unifier<KeyValPair<K, HighLow<V>>> getUnifier()
+    {
+      return new UnifierKeyValRange<K,V>();
+    }
+  };
 
-	/**
-	 * Emits range for each key. If no data is received, no emit is done Clears
-	 * the internal data before return
-	 */
-	@Override
-	public void endWindow()
-	{
-		for (Map.Entry<K, V> e : high.entrySet()) {
-			range.emit(new KeyValPair<K, HighLow<V>>(e.getKey(), new HighLow(e
-					.getValue(), low.get(e.getKey()))));
-		}
-		clearCache();
-	}
+  /**
+   * Emits range for each key. If no data is received, no emit is done Clears
+   * the internal data before return
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<K, V> e : high.entrySet()) {
+      range.emit(new KeyValPair<K, HighLow<V>>(e.getKey(), new HighLow(e
+          .getValue(), low.get(e.getKey()))));
+    }
+    clearCache();
+  }
 
-	public void clearCache()
-	{
-		high.clear();
-		low.clear();
-	}
+  public void clearCache()
+  {
+    high.clear();
+    low.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
index 24f09f9..286d72e 100644
--- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
+++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Calculate the running average of the input numbers and emit it at the end of the window. 
@@ -46,70 +46,70 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
 @OperatorAnnotation(partitionable = false)
 public class RunningAverage extends BaseOperator
 {
-	/**
-	 * Computed average.
-	 */
-	double average;
+  /**
+   * Computed average.
+   */
+  double average;
 
-	/**
-	 * Number of values on input port.
-	 */
-	long count;
+  /**
+   * Number of values on input port.
+   */
+  long count;
 
-	/**
-	 * Input number port.
-	 */
-	public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
-	{
-		@Override
-		public void process(Number tuple)
-		{
-			double sum = (count * average) + tuple.doubleValue();
-			count++;
-			average = sum / count;
-		}
-	};
+  /**
+   * Input number port.
+   */
+  public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
+  {
+    @Override
+    public void process(Number tuple)
+    {
+      double sum = (count * average) + tuple.doubleValue();
+      count++;
+      average = sum / count;
+    }
+  };
 
-	/**
-	 * Double average output port.
-	 */
-	public final transient DefaultOutputPort<Double> doubleAverage = new DefaultOutputPort<Double>();
+  /**
+   * Double average output port.
+   */
+  public final transient DefaultOutputPort<Double> doubleAverage = new DefaultOutputPort<Double>();
 
-	/**
-	 * Float average output port.
-	 */
-	public final transient DefaultOutputPort<Float> floatAverage = new DefaultOutputPort<Float>();
+  /**
+   * Float average output port.
+   */
+  public final transient DefaultOutputPort<Float> floatAverage = new DefaultOutputPort<Float>();
 
-	/**
-	 * Long average output port.
-	 */
-	public final transient DefaultOutputPort<Long> longAverage = new DefaultOutputPort<Long>();
+  /**
+   * Long average output port.
+   */
+  public final transient DefaultOutputPort<Long> longAverage = new DefaultOutputPort<Long>();
 
-	/**
-	 * Integer average output port.
-	 */
-	public final transient DefaultOutputPort<Integer> integerAverage = new DefaultOutputPort<Integer>();
+  /**
+   * Integer average output port.
+   */
+  public final transient DefaultOutputPort<Integer> integerAverage = new DefaultOutputPort<Integer>();
 
-	/**
-	 * End window operator override.
-	 */
-	@Override
-	public void endWindow()
-	{
-		if (doubleAverage.isConnected()) {
-			doubleAverage.emit(average);
-		}
+  /**
+   * End window operator override.
+   */
+  @Override
+  public void endWindow()
+  {
+    if (doubleAverage.isConnected()) {
+      doubleAverage.emit(average);
+    }
 
-		if (floatAverage.isConnected()) {
-			floatAverage.emit((float) average);
-		}
+    if (floatAverage.isConnected()) {
+      floatAverage.emit((float)average);
+    }
 
-		if (longAverage.isConnected()) {
-			longAverage.emit((long) average);
-		}
+    if (longAverage.isConnected()) {
+      longAverage.emit((long)average);
+    }
 
-		if (integerAverage.isConnected()) {
-			integerAverage.emit((int) average);
-		}
-	}
+    if (integerAverage.isConnected()) {
+      integerAverage.emit((int)average);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Sigma.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Sigma.java b/library/src/main/java/com/datatorrent/lib/math/Sigma.java
index 7355238..6bfb9cf 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Sigma.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Sigma.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
 import java.util.Collection;
 
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
 /**
  * Adds incoming tuple to the state and emits the result of each addition on the respective ports.
  * <p>
@@ -47,27 +47,27 @@ import java.util.Collection;
 @OperatorAnnotation(partitionable = false)
 public class Sigma<T extends Number> extends AbstractAggregateCalc<T>
 {
-	@Override
-	public long aggregateLongs(Collection<T> collection)
-	{
-		long l = 0;
+  @Override
+  public long aggregateLongs(Collection<T> collection)
+  {
+    long l = 0;
 
-		for (Number n : collection) {
-			l += n.longValue();
-		}
+    for (Number n : collection) {
+      l += n.longValue();
+    }
 
-		return l;
-	}
+    return l;
+  }
 
-	@Override
-	public double aggregateDoubles(Collection<T> collection)
-	{
-		double d = 0;
+  @Override
+  public double aggregateDoubles(Collection<T> collection)
+  {
+    double d = 0;
 
-		for (Number n : collection) {
-			d += n.doubleValue();
-		}
+    for (Number n : collection) {
+      d += n.doubleValue();
+    }
 
-		return d;
-	}
+    return d;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Sum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Sum.java b/library/src/main/java/com/datatorrent/lib/math/Sum.java
index 38be2fc..0f5e64f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Sum.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Sum.java
@@ -56,204 +56,204 @@ import com.datatorrent.lib.util.UnifierSumNumber;
  * @since 0.3.3
  */
 public class Sum<V extends Number> extends BaseNumberValueOperator<V> implements
-		Unifier<V>
+    Unifier<V>
 {
-	/**
-	 * Sum value.
-	 */
-	protected double sums = 0;
+  /**
+   * Sum value.
+   */
+  protected double sums = 0;
 
-	/**
-	 * Input tuple processed flag.
-	 */
-	protected boolean tupleAvailable = false;
+  /**
+   * Input tuple processed flag.
+   */
+  protected boolean tupleAvailable = false;
 
-	/**
-	 * Accumulate sum flag.
-	 */
-	protected boolean cumulative = false;
+  /**
+   * Accumulate sum flag.
+   */
+  protected boolean cumulative = false;
 
-	/**
-	 * Input port to receive data.&nbsp; It computes sum and count for each tuple.
-	 */
-	public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-	{
-		/**
-		 * Computes sum and count with each tuple
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			Sum.this.process(tuple);
-			tupleAvailable = true;
-		}
-	};
+  /**
+   * Input port to receive data.&nbsp; It computes sum and count for each tuple.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Computes sum and count with each tuple
+     */
+    @Override
+    public void process(V tuple)
+    {
+      Sum.this.process(tuple);
+      tupleAvailable = true;
+    }
+  };
 
-	/**
-	 * Unifier process override.
-	 */
-	@Override
-	public void process(V tuple)
-	{
-		sums += tuple.doubleValue();
-		tupleAvailable = true; // also need to set here for Unifier
-	}
+  /**
+   * Unifier process override.
+   */
+  @Override
+  public void process(V tuple)
+  {
+    sums += tuple.doubleValue();
+    tupleAvailable = true; // also need to set here for Unifier
+  }
 
-	/**
-	 * Output sum port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<V> sum = new DefaultOutputPort<V>()
-	{
-		@Override
-		public Unifier<V> getUnifier()
-		{
-			UnifierSumNumber<V> ret = new UnifierSumNumber<V>();
-			ret.setVType(getType());
-			return ret;
-		}
-	};
+  /**
+   * Output sum port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<V> sum = new DefaultOutputPort<V>()
+  {
+    @Override
+    public Unifier<V> getUnifier()
+    {
+      UnifierSumNumber<V> ret = new UnifierSumNumber<V>();
+      ret.setVType(getType());
+      return ret;
+    }
+  };
 
-	/**
-	 * Output double sum port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Double> sumDouble = new DefaultOutputPort<Double>()
-	{
-		@Override
-		public Unifier<Double> getUnifier()
-		{
-			UnifierSumNumber<Double> ret = new UnifierSumNumber<Double>();
-			ret.setType(Double.class);
-			return ret;
-		}
-	};
+  /**
+   * Output double sum port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Double> sumDouble = new DefaultOutputPort<Double>()
+  {
+    @Override
+    public Unifier<Double> getUnifier()
+    {
+      UnifierSumNumber<Double> ret = new UnifierSumNumber<Double>();
+      ret.setType(Double.class);
+      return ret;
+    }
+  };
 
-	/**
-	 * Output integer sum port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Integer> sumInteger = new DefaultOutputPort<Integer>()
-	{
-		@Override
-		public Unifier<Integer> getUnifier()
-		{
-			UnifierSumNumber<Integer> ret = new UnifierSumNumber<Integer>();
-			ret.setType(Integer.class);
-			return ret;
-		}
-	};
+  /**
+   * Output integer sum port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> sumInteger = new DefaultOutputPort<Integer>()
+  {
+    @Override
+    public Unifier<Integer> getUnifier()
+    {
+      UnifierSumNumber<Integer> ret = new UnifierSumNumber<Integer>();
+      ret.setType(Integer.class);
+      return ret;
+    }
+  };
 
-	/**
-	 * Output Long sum port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Long> sumLong = new DefaultOutputPort<Long>()
-	{
-		@Override
-		public Unifier<Long> getUnifier()
-		{
-			UnifierSumNumber<Long> ret = new UnifierSumNumber<Long>();
-			ret.setType(Long.class);
-			return ret;
-		}
-	};
+  /**
+   * Output Long sum port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Long> sumLong = new DefaultOutputPort<Long>()
+  {
+    @Override
+    public Unifier<Long> getUnifier()
+    {
+      UnifierSumNumber<Long> ret = new UnifierSumNumber<Long>();
+      ret.setType(Long.class);
+      return ret;
+    }
+  };
 
-	/**
-	 * Output short sum port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Short> sumShort = new DefaultOutputPort<Short>()
-	{
-		@Override
-		public Unifier<Short> getUnifier()
-		{
-			UnifierSumNumber<Short> ret = new UnifierSumNumber<Short>();
-			ret.setType(Short.class);
-			return ret;
-		}
-	};
+  /**
+   * Output short sum port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Short> sumShort = new DefaultOutputPort<Short>()
+  {
+    @Override
+    public Unifier<Short> getUnifier()
+    {
+      UnifierSumNumber<Short> ret = new UnifierSumNumber<Short>();
+      ret.setType(Short.class);
+      return ret;
+    }
+  };
 
-	/**
-	 * Output float sum port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Float> sumFloat = new DefaultOutputPort<Float>()
-	{
-		@Override
-		public Unifier<Float> getUnifier()
-		{
-			UnifierSumNumber<Float> ret = new UnifierSumNumber<Float>();
-			ret.setType(Float.class);
-			return ret;
-		}
-	};
+  /**
+   * Output float sum port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Float> sumFloat = new DefaultOutputPort<Float>()
+  {
+    @Override
+    public Unifier<Float> getUnifier()
+    {
+      UnifierSumNumber<Float> ret = new UnifierSumNumber<Float>();
+      ret.setType(Float.class);
+      return ret;
+    }
+  };
 
-	/**
-	 * Redis server output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Map<Integer, Integer>> redisport = new DefaultOutputPort<Map<Integer, Integer>>();
+  /**
+   * Redis server output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Map<Integer, Integer>> redisport = new DefaultOutputPort<Map<Integer, Integer>>();
 
-	/**
-	 * Check if sum has to be cumulative.
-	 * 
-	 * @return cumulative flag
-	 */
-	public boolean isCumulative()
-	{
-		return cumulative;
-	}
+  /**
+   * Check if sum has to be cumulative.
+   *
+   * @return cumulative flag
+   */
+  public boolean isCumulative()
+  {
+    return cumulative;
+  }
 
-	/**
-	 * Set cumulative flag.
-	 * 
-	 * @param cumulative
-	 *          flag
-	 */
-	public void setCumulative(boolean cumulative)
-	{
-		this.cumulative = cumulative;
-	}
+  /**
+   * Set cumulative flag.
+   *
+   * @param cumulative
+   *          flag
+   */
+  public void setCumulative(boolean cumulative)
+  {
+    this.cumulative = cumulative;
+  }
 
-	/**
-	 * Emits sum and count if ports are connected
-	 */
-	@Override
-	public void endWindow()
-	{
-		if (doEmit()) {
-			sum.emit(getValue(sums));
-			sumDouble.emit(sums);
-			sumInteger.emit((int) sums);
-			sumLong.emit((long) sums);
-			sumShort.emit((short) sums);
-			sumFloat.emit((float) sums);
-			tupleAvailable = false;
-			Map<Integer, Integer> redis = new HashMap<Integer, Integer>();
-			redis.put(1, (int) sums);
-			redisport.emit(redis);
-		}
-		clearCache();
-	}
+  /**
+   * Emits sum and count if ports are connected
+   */
+  @Override
+  public void endWindow()
+  {
+    if (doEmit()) {
+      sum.emit(getValue(sums));
+      sumDouble.emit(sums);
+      sumInteger.emit((int)sums);
+      sumLong.emit((long)sums);
+      sumShort.emit((short)sums);
+      sumFloat.emit((float)sums);
+      tupleAvailable = false;
+      Map<Integer, Integer> redis = new HashMap<Integer, Integer>();
+      redis.put(1, (int)sums);
+      redisport.emit(redis);
+    }
+    clearCache();
+  }
 
-	/**
-	 * Clears the cache making this operator stateless on window boundary
-	 */
-	private void clearCache()
-	{
-		if (!cumulative) {
-			sums = 0;
-		}
-	}
+  /**
+   * Clears the cache making this operator stateless on window boundary
+   */
+  private void clearCache()
+  {
+    if (!cumulative) {
+      sums = 0;
+    }
+  }
 
-	/**
-	 * Decides whether emit has to be done in this window on port "sum"
-	 * 
-	 * @return true is sum port is connected
-	 */
-	private boolean doEmit()
-	{
-		return tupleAvailable;
-	}
+  /**
+   * Decides whether emit has to be done in this window on port "sum"
+   *
+   * @return true is sum port is connected
+   */
+  private boolean doEmit()
+  {
+    return tupleAvailable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java b/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
index 76242f4..c2d8465 100644
--- a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
@@ -18,16 +18,18 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableInt;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
 import com.datatorrent.lib.util.UnifierHashMapInteger;
 import com.datatorrent.lib.util.UnifierHashMapSumKeys;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableInt;
 
 /**
  * Emits the sum and count of values for each key at the end of window.
@@ -58,244 +60,244 @@ import org.apache.commons.lang.mutable.MutableInt;
  * @since 0.3.3
  */
 public class SumCountMap<K, V extends Number> extends
-		BaseNumberKeyValueOperator<K, V>
+    BaseNumberKeyValueOperator<K, V>
 {
-	/**
-	 * Key/double sum map.
-	 */
-	protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+  /**
+   * Key/double sum map.
+   */
+  protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+
+  /**
+   * Key/integer sum map.
+   */
+  protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
 
-	/**
-	 * Key/integer sum map.
-	 */
-	protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
+  /**
+   * Cumulative sum flag.
+   */
+  protected boolean cumulative = false;
 
-	/**
-	 * Cumulative sum flag.
-	 */
-	protected boolean cumulative = false;
+  /**
+   * Input port that takes a map.&nbsp; It adds the values for each key and counts the number of occurrences for each key.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * For each tuple (a HashMap of keys,val pairs) Adds the values for each
+     * key, Counts the number of occurrences of each key
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e : tuple.entrySet()) {
+        K key = e.getKey();
+        if (!doprocessKey(key)) {
+          continue;
+        }
+        if (sum.isConnected()) {
+          MutableDouble val = sums.get(key);
+          if (val == null) {
+            val = new MutableDouble(e.getValue().doubleValue());
+          } else {
+            val.add(e.getValue().doubleValue());
+          }
+          sums.put(cloneKey(key), val);
+        }
+        if (SumCountMap.this.count.isConnected()) {
+          MutableInt count = counts.get(key);
+          if (count == null) {
+            count = new MutableInt(0);
+            counts.put(cloneKey(key), count);
+          }
+          count.increment();
+        }
+      }
+    }
+  };
 
-	/**
-	 * Input port that takes a map.&nbsp; It adds the values for each key and counts the number of occurrences for each key.
-	 */
-	public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
-	{
-		/**
-		 * For each tuple (a HashMap of keys,val pairs) Adds the values for each
-		 * key, Counts the number of occurrences of each key
-		 */
-		@Override
-		public void process(Map<K, V> tuple)
-		{
-			for (Map.Entry<K, V> e : tuple.entrySet()) {
-				K key = e.getKey();
-				if (!doprocessKey(key)) {
-					continue;
-				}
-				if (sum.isConnected()) {
-					MutableDouble val = sums.get(key);
-					if (val == null) {
-						val = new MutableDouble(e.getValue().doubleValue());
-					} else {
-						val.add(e.getValue().doubleValue());
-					}
-					sums.put(cloneKey(key), val);
-				}
-				if (SumCountMap.this.count.isConnected()) {
-					MutableInt count = counts.get(key);
-					if (count == null) {
-						count = new MutableInt(0);
-						counts.put(cloneKey(key), count);
-					}
-					count.increment();
-				}
-			}
-		}
-	};
+  /**
+   * Key,sum map output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMapSumKeys<K, V>();
+    }
+  };
 
-	/**
-	 * Key,sum map output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>()
-	{
-		@Override
-		public Unifier<HashMap<K, V>> getUnifier()
-		{
-			return new UnifierHashMapSumKeys<K, V>();
-		}
-	};
+  /**
+   * Key,double sum map output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Double>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>();
+      ret.setType(Double.class);
+      return ret;
+    }
+  };
 
-	/**
-	 * Key,double sum map output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>()
-	{
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		@Override
-		public Unifier<HashMap<K, Double>> getUnifier()
-		{
-			UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>();
-			ret.setType(Double.class);
-			return ret;
-		}
-	};
+  /**
+   * Key,integer sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Integer>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>();
+      ret.setType(Integer.class);
+      return ret;
+    }
+  };
 
-	/**
-	 * Key,integer sum output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>()
-	{
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		@Override
-		public Unifier<HashMap<K, Integer>> getUnifier()
-		{
-			UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>();
-			ret.setType(Integer.class);
-			return ret;
-		}
-	};
 
-	
         /**
-	 * Key,long sum output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>()
-	{
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		@Override
-		public Unifier<HashMap<K, Long>> getUnifier()
-		{
-			UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>();
-			ret.setType(Long.class);
-			return ret;
-		}
-	};
+   * Key,long sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Long>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>();
+      ret.setType(Long.class);
+      return ret;
+    }
+  };
         
         /**
-	 * Key,short sum output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>()
-	{
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		@Override
-		public Unifier<HashMap<K, Short>> getUnifier()
-		{
-			UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>();
-			ret.setType(Short.class);
-			return ret;
-		}
-	};
+   * Key,short sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Short>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>();
+      ret.setType(Short.class);
+      return ret;
+    }
+  };
         
         /**
-	 * Key,float sum output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>()
-	{
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		@Override
-		public Unifier<HashMap<K, Float>> getUnifier()
-		{
-			UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>();
-			ret.setType(Float.class);
-			return ret;
-		}
-	};
+   * Key,float sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>()
+  {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public Unifier<HashMap<K, Float>> getUnifier()
+    {
+      UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>();
+      ret.setType(Float.class);
+      return ret;
+    }
+  };
         
         /**
-	 * Key,integer sum output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>()
-	{
-		@Override
-		public Unifier<HashMap<K, Integer>> getUnifier()
-		{
-			return new UnifierHashMapInteger<K>();
-		}
-	};
+   * Key,integer sum output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>()
+  {
+    @Override
+    public Unifier<HashMap<K, Integer>> getUnifier()
+    {
+      return new UnifierHashMapInteger<K>();
+    }
+  };
 
-	/**
-	 * Get cumulative flag.
-	 * 
-	 * @return cumulative flag
-	 */
-	public boolean isCumulative()
-	{
-		return cumulative;
-	}
+  /**
+   * Get cumulative flag.
+   *
+   * @return cumulative flag
+   */
+  public boolean isCumulative()
+  {
+    return cumulative;
+  }
 
-	/**
-	 * set cumulative flag.
-	 * 
-	 * @param cumulative
-	 *          input flag
-	 */
-	public void setCumulative(boolean cumulative)
-	{
-		this.cumulative = cumulative;
-	}
+  /**
+   * set cumulative flag.
+   *
+   * @param cumulative
+   *          input flag
+   */
+  public void setCumulative(boolean cumulative)
+  {
+    this.cumulative = cumulative;
+  }
 
-	/**
-	 * Emits on all ports that are connected. Data is precomputed during process
-	 * on input port endWindow just emits it for each key Clears the internal data
-	 * before return
-	 */
-	@Override
-	public void endWindow()
-	{
+  /**
+   * Emits on all ports that are connected. Data is precomputed during process
+   * on input port endWindow just emits it for each key Clears the internal data
+   * before return
+   */
+  @Override
+  public void endWindow()
+  {
 
-		// Should allow users to send each key as a separate tuple to load balance
-		// This is an aggregate node, so load balancing would most likely not be
-		// needed
+    // Should allow users to send each key as a separate tuple to load balance
+    // This is an aggregate node, so load balancing would most likely not be
+    // needed
 
-		HashMap<K, V> tuples = new HashMap<K, V>();
-		HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
-		HashMap<K, Double> dtuples = new HashMap<K, Double>();
-		HashMap<K, Integer> ituples = new HashMap<K, Integer>();
-		HashMap<K, Float> ftuples = new HashMap<K, Float>();
-		HashMap<K, Long> ltuples = new HashMap<K, Long>();
-		HashMap<K, Short> stuples = new HashMap<K, Short>();
+    HashMap<K, V> tuples = new HashMap<K, V>();
+    HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
+    HashMap<K, Double> dtuples = new HashMap<K, Double>();
+    HashMap<K, Integer> ituples = new HashMap<K, Integer>();
+    HashMap<K, Float> ftuples = new HashMap<K, Float>();
+    HashMap<K, Long> ltuples = new HashMap<K, Long>();
+    HashMap<K, Short> stuples = new HashMap<K, Short>();
 
-		for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
-			K key = e.getKey();
-			MutableDouble val = e.getValue();
-			tuples.put(key, getValue(val.doubleValue()));
-			dtuples.put(key, val.doubleValue());
-			ituples.put(key, val.intValue());
-			ftuples.put(key, val.floatValue());
-			ltuples.put(key, val.longValue());
-			stuples.put(key, val.shortValue());
-			// ctuples.put(key, counts.get(e.getKey()).toInteger());
-			MutableInt c = counts.get(e.getKey());
-			if (c != null) {
-				ctuples.put(key, c.toInteger());
-			}
-		}
+    for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
+      K key = e.getKey();
+      MutableDouble val = e.getValue();
+      tuples.put(key, getValue(val.doubleValue()));
+      dtuples.put(key, val.doubleValue());
+      ituples.put(key, val.intValue());
+      ftuples.put(key, val.floatValue());
+      ltuples.put(key, val.longValue());
+      stuples.put(key, val.shortValue());
+      // ctuples.put(key, counts.get(e.getKey()).toInteger());
+      MutableInt c = counts.get(e.getKey());
+      if (c != null) {
+        ctuples.put(key, c.toInteger());
+      }
+    }
 
-		sum.emit(tuples);
-		sumDouble.emit(dtuples);
-		sumInteger.emit(ituples);
-		sumLong.emit(ltuples);
-		sumShort.emit(stuples);
-		sumFloat.emit(ftuples);
-		count.emit(ctuples);
-		clearCache();
-	}
+    sum.emit(tuples);
+    sumDouble.emit(dtuples);
+    sumInteger.emit(ituples);
+    sumLong.emit(ltuples);
+    sumShort.emit(stuples);
+    sumFloat.emit(ftuples);
+    count.emit(ctuples);
+    clearCache();
+  }
 
-	/**
-	 * Clear sum maps.
-	 */
-	private void clearCache()
-	{
-		if (!cumulative) {
-			sums.clear();
-			counts.clear();
-		}
-	}
+  /**
+   * Clear sum maps.
+   */
+  private void clearCache()
+  {
+    if (!cumulative) {
+      sums.clear();
+      counts.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
index d79a490..99e2492 100644
--- a/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
@@ -23,13 +23,12 @@ import java.util.Map;
 
 import org.apache.commons.lang.mutable.MutableDouble;
 
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * Emits the sum of values for each key at the end of window.
@@ -93,8 +92,7 @@ public class SumKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
       SumEntry val = sums.get(key);
       if (val == null) {
         val = new SumEntry(new MutableDouble(tuple.getValue().doubleValue()), true);
-      }
-      else {
+      } else {
         val.sum.add(tuple.getValue().doubleValue());
         val.changed = true;
       }
@@ -194,12 +192,11 @@ public class SumKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
   public void clearCache()
   {
     if (cumulative) {
-      for (Map.Entry<K, SumEntry> e: sums.entrySet()) {
+      for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
         SumEntry val = e.getValue();
         val.changed = false;
       }
-    }
-    else {
+    } else {
       sums.clear();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
index adc77fa..7f36ef5 100644
--- a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
@@ -18,12 +18,13 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.api.DefaultOutputPort;
-import org.xml.sax.InputSource;
-
 import java.io.StringReader;
 import java.util.List;
 
+import org.xml.sax.InputSource;
+
+import com.datatorrent.api.DefaultOutputPort;
+
 /**
  * An implementation of the AbstractXmlKeyValueCartesianProduct operator that takes in the xml document
  * as a String input and outputs the cartesian product as Strings.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
index 231285f..7217086 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
 
 import javax.validation.constraints.Min;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  *
@@ -47,108 +47,108 @@ public abstract class AbstractSlidingWindow<T, S> extends BaseOperator
         /**
          * Input port for getting incoming data.
          */
-	public final transient DefaultInputPort<T> data = new DefaultInputPort<T>()
-	{
-		@Override
-		public void process(T tuple)
-		{
-			processDataTuple(tuple);
-		}
-	};
-
-	protected ArrayList<S> states = null;
-
-	protected S lastExpiredWindowState = null;
-
-	protected int currentCursor = -1;
-
-	@Min(2)
-	int windowSize = 2;
-
-	/**
-	 * getter function for n (number of previous window states
-	 *
-	 * @return n
-	 */
-	@Min(2)
-	public int getWindowSize()
-	{
-		return windowSize;
-	}
-
-	/**
-	 * setter for windowSize
-	 *
-	 * @param i
-	 */
-	public void setWindowSize(int windowSize)
-	{
-		this.windowSize = windowSize;
-	}
-
-	abstract protected void processDataTuple(T tuple);
-
-	/**
-	 * Implement this method to create the state object needs to be kept in the sliding window
-	 *
-	 * @return the state of current streaming window
-	 */
-	public abstract S createWindowState();
-
-	/**
-	 * Get the Streaming window state in it's coming the order start from 0
-	 *
-	 * @param i
-	 *   0 the state of the first coming streaming window
-	 *   -1 the state of the last expired streaming window
-	 * @return State of the streaming window
-	 * @throws ArrayIndexOutOfBoundsException if i >= sliding window size
-	 */
-	public S getStreamingWindowState(int i)
-	{
-	  if(i == -1){
-	    return lastExpiredWindowState;
-	  }
-		if (i >= getWindowSize()) {
-			throw new ArrayIndexOutOfBoundsException();
-		}
-		int index = (currentCursor + 1 + i) % windowSize ;
-		return states.get(index);
-	}
-
-	/**
-	 * Moves states by 1 and sets current state to null. If you override
-	 * beginWindow, you must call super.beginWindow(windowId) to ensure proper
-	 * operator behavior.
-	 *
-	 * @param windowId
-	 */
-	@Override
-	public void beginWindow(long windowId)
-	{
-	  // move currentCursor 1 position
-		currentCursor = (currentCursor + 1) % windowSize;
-		// expire the state at the first position which is the state of the streaming window moving out of the current application window
-		lastExpiredWindowState = states.get(currentCursor);
-
-		states.set(currentCursor, createWindowState());
-
-	}
-
-	/**
-	 * Sets up internal state structure
-	 *
-	 * @param context
-	 */
-	@Override
-	public void setup(OperatorContext context)
-	{
-	  super.setup(context);
-		states = new ArrayList<S>(windowSize);
-		//initialize the sliding window state to null
-		for (int i = 0; i < windowSize; i++) {
-			states.add(null);
-		}
-		currentCursor = -1;
-	}
+  public final transient DefaultInputPort<T> data = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      processDataTuple(tuple);
+    }
+  };
+
+  protected ArrayList<S> states = null;
+
+  protected S lastExpiredWindowState = null;
+
+  protected int currentCursor = -1;
+
+  @Min(2)
+  int windowSize = 2;
+
+  /**
+   * getter function for n (number of previous window states
+   *
+   * @return n
+   */
+  @Min(2)
+  public int getWindowSize()
+  {
+    return windowSize;
+  }
+
+  /**
+   * setter for windowSize
+   *
+   * @param windowSize
+   */
+  public void setWindowSize(int windowSize)
+  {
+    this.windowSize = windowSize;
+  }
+
+  protected abstract void processDataTuple(T tuple);
+
+  /**
+   * Implement this method to create the state object needs to be kept in the sliding window
+   *
+   * @return the state of current streaming window
+   */
+  public abstract S createWindowState();
+
+  /**
+   * Get the Streaming window state in it's coming the order start from 0
+   *
+   * @param i
+   *   0 the state of the first coming streaming window
+   *   -1 the state of the last expired streaming window
+   * @return State of the streaming window
+   * @throws ArrayIndexOutOfBoundsException if i >= sliding window size
+   */
+  public S getStreamingWindowState(int i)
+  {
+    if (i == -1) {
+      return lastExpiredWindowState;
+    }
+    if (i >= getWindowSize()) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    int index = (currentCursor + 1 + i) % windowSize;
+    return states.get(index);
+  }
+
+  /**
+   * Moves states by 1 and sets current state to null. If you override
+   * beginWindow, you must call super.beginWindow(windowId) to ensure proper
+   * operator behavior.
+   *
+   * @param windowId
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    // move currentCursor 1 position
+    currentCursor = (currentCursor + 1) % windowSize;
+    // expire the state at the first position which is the state of the streaming window moving out of the current application window
+    lastExpiredWindowState = states.get(currentCursor);
+
+    states.set(currentCursor, createWindowState());
+
+  }
+
+  /**
+   * Sets up internal state structure
+   *
+   * @param context
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    states = new ArrayList<S>(windowSize);
+    //initialize the sliding window state to null
+    for (int i = 0; i < windowSize; i++) {
+      states.add(null);
+    }
+    currentCursor = -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
index 35e777b..69e8819 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
@@ -52,104 +52,106 @@ import com.datatorrent.lib.util.KeyValPair;
  * @since 0.3.3
  */
 public abstract class AbstractSlidingWindowKeyVal<K, V extends Number, S extends SimpleMovingAverageObject>
-		extends BaseNumberKeyValueOperator<K, V>
+    extends BaseNumberKeyValueOperator<K, V>
 {
-	/**
-	 * buffer to hold state information of different windows.
-	 */
-	protected HashMap<K, ArrayList<S>> buffer = new HashMap<K, ArrayList<S>>();
-	/**
-	 * Index of windows stating at 0.
-	 */
-	protected int currentstate = -1;
+  /**
+   * buffer to hold state information of different windows.
+   */
+  protected HashMap<K, ArrayList<S>> buffer = new HashMap<K, ArrayList<S>>();
+  /**
+   * Index of windows stating at 0.
+   */
+  protected int currentstate = -1;
 
-	/**
-	 * Concrete class has to implement how they want the tuple to be processed.
-	 *
-	 * @param tuple
-	 *          a keyVal pair of tuple.
-	 */
-	public abstract void processDataTuple(KeyValPair<K, V> tuple);
+  /**
+   * Concrete class has to implement how they want the tuple to be processed.
+   *
+   * @param tuple
+   *          a keyVal pair of tuple.
+   */
+  public abstract void processDataTuple(KeyValPair<K, V> tuple);
 
-	/**
-	 * Concrete class has to implement what to emit at the end of window.
-	 *
-	 * @param key
-	 * @param obj
-	 */
-	public abstract void emitTuple(K key, ArrayList<S> obj);
+  /**
+   * Concrete class has to implement what to emit at the end of window.
+   *
+   * @param key
+   * @param obj
+   */
+  public abstract void emitTuple(K key, ArrayList<S> obj);
 
-	/**
-	 * Length of sliding windows. Minimum value is 2.
-	 */
-	@Min(2)
-	protected int windowSize = 2;
-	protected long windowId;
+  /**
+   * Length of sliding windows. Minimum value is 2.
+   */
+  @Min(2)
+  protected int windowSize = 2;
+  protected long windowId;
 
-	/**
-	 * Getter function for windowSize (number of previous window buffer).
-	 *
-	 * @return windowSize
-	 */
-	public int getWindowSize()
-	{
-		return windowSize;
-	}
+  /**
+   * Getter function for windowSize (number of previous window buffer).
+   *
+   * @return windowSize
+   */
+  public int getWindowSize()
+  {
+    return windowSize;
+  }
 
-	/**
-	 * @param windowSize
-	 */
-	public void setWindowSize(int windowSize)
-	{
-		this.windowSize = windowSize;
-	}
+  /**
+   * @param windowSize
+   */
+  public void setWindowSize(int windowSize)
+  {
+    this.windowSize = windowSize;
+  }
 
-	/**
-	 * Input port for getting incoming data.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
-	{
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			processDataTuple(tuple);
-		}
-	};
+  /**
+   * Input port for getting incoming data.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      processDataTuple(tuple);
+    }
+  };
 
-	/**
-	 * Moves buffer by 1 and clear contents of current. If you override
-	 * beginWindow, you must call super.beginWindow(windowId) to ensure proper
-	 * operator behavior.
-	 *
-	 * @param windowId
-	 */
-	@Override
-	public void beginWindow(long windowId)
-	{
-		this.windowId = windowId;
-		currentstate++;
-		if (currentstate >= windowSize) {
-			for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
-				ArrayList<S> states = e.getValue();
-                		S first = states.get(0);
-				for (int i=1; i < windowSize; i++) states.set(i-1, states.get(i));
-				states.set(windowSize-1, first);
-			}
-			currentstate = windowSize-1;
-		}
-		for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
-			e.getValue().get(currentstate).clear();
-		}
-	}
+  /**
+   * Moves buffer by 1 and clear contents of current. If you override
+   * beginWindow, you must call super.beginWindow(windowId) to ensure proper
+   * operator behavior.
+   *
+   * @param windowId
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    this.windowId = windowId;
+    currentstate++;
+    if (currentstate >= windowSize) {
+      for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
+        ArrayList<S> states = e.getValue();
+        S first = states.get(0);
+        for (int i = 1; i < windowSize; i++) {
+          states.set(i - 1, states.get(i));
+        }
+        states.set(windowSize - 1, first);
+      }
+      currentstate = windowSize - 1;
+    }
+    for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
+      e.getValue().get(currentstate).clear();
+    }
+  }
 
-	/**
-	 * Emit tuple for each key.
-	 */
-	@Override
-	public void endWindow()
-	{
-		for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
-			emitTuple(e.getKey(), e.getValue());
-		}
-	}
+  /**
+   * Emit tuple for each key.
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
+      emitTuple(e.getKey(), e.getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
index 09a9c24..ad4f8e5 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
@@ -71,7 +71,7 @@ public class MultiWindowRangeKeyVal<K, V extends Number> extends RangeKeyVal<K,
    * Clears the internal data before return
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
-	@Override
+  @Override
   public void endWindow()
   {
     boolean emit = (++windowCount) % windowSize == 0;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
index d6463bd..7d5c377 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.lib.multiwindow;
 
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.lib.math.SumKeyVal;
 import com.datatorrent.lib.util.KeyValPair;
 
-import java.util.Map;
-import javax.validation.constraints.Min;
-
 /**
  * A sum operator of KeyValPair schema which accumulates sum across multiple
  * streaming windows.
@@ -53,45 +54,45 @@ import javax.validation.constraints.Min;
 @OperatorAnnotation(partitionable = false)
 public class MultiWindowSumKeyVal<K, V extends Number> extends SumKeyVal<K, V>
 {
-	/**
-	 * Number of streaming window after which tuple got emitted.
-	 */
-	@Min(2)
-	private int windowSize = 2;
-	private long windowCount = 0;
+  /**
+   * Number of streaming window after which tuple got emitted.
+   */
+  @Min(2)
+  private int windowSize = 2;
+  private long windowCount = 0;
 
-	public void setWindowSize(int windowSize)
-	{
-		this.windowSize = windowSize;
-	}
+  public void setWindowSize(int windowSize)
+  {
+    this.windowSize = windowSize;
+  }
 
-	/**
-	 * Emit only at the end of windowSize window boundary.
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Override
-	public void endWindow()
-	{
-		boolean emit = (++windowCount) % windowSize == 0;
+  /**
+   * Emit only at the end of windowSize window boundary.
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public void endWindow()
+  {
+    boolean emit = (++windowCount) % windowSize == 0;
 
-		if (!emit) {
-			return;
-		}
+    if (!emit) {
+      return;
+    }
 
-		// Emit only at the end of application window boundary.
-		boolean dosum = sum.isConnected();
+    // Emit only at the end of application window boundary.
+    boolean dosum = sum.isConnected();
 
-		if (dosum) {
-			for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
-				K key = e.getKey();
-				if (dosum) {
-					sum.emit(new KeyValPair(key, getValue(e.getValue().sum.doubleValue())));
-				}
-			}
-		}
+    if (dosum) {
+      for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
+        K key = e.getKey();
+        if (dosum) {
+          sum.emit(new KeyValPair(key, getValue(e.getValue().sum.doubleValue())));
+        }
+      }
+    }
 
-		// Clear cumulative sum at the end of application window boundary.
-		sums.clear();
-	}
+    // Clear cumulative sum at the end of application window boundary.
+    sums.clear();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java b/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
index c49c2b4..4d0a2c1 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
@@ -48,85 +48,85 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 @OperatorAnnotation(partitionable = false)
 public class SimpleMovingAverage<K, V extends Number> extends
-		AbstractSlidingWindowKeyVal<K, V, SimpleMovingAverageObject>
+    AbstractSlidingWindowKeyVal<K, V, SimpleMovingAverageObject>
 {
-	/**
-	 * Output port to emit simple moving average (SMA) of last N window as Double.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSMA = new DefaultOutputPort<KeyValPair<K, Double>>();
-	/**
-	 * Output port to emit simple moving average (SMA) of last N window as Float.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSMA = new DefaultOutputPort<KeyValPair<K, Float>>();
-	/**
-	 * Output port to emit simple moving average (SMA) of last N window as Long.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Long>> longSMA = new DefaultOutputPort<KeyValPair<K, Long>>();
-	/**
-	 * Output port to emit simple moving average (SMA) of last N window as
-	 * Integer.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSMA = new DefaultOutputPort<KeyValPair<K, Integer>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Double.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSMA = new DefaultOutputPort<KeyValPair<K, Double>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Float.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSMA = new DefaultOutputPort<KeyValPair<K, Float>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Long.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Long>> longSMA = new DefaultOutputPort<KeyValPair<K, Long>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as
+   * Integer.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSMA = new DefaultOutputPort<KeyValPair<K, Integer>>();
 
-	/**
-	 * Create the list if key doesn't exist. Add value to buffer and increment
-	 * counter.
-	 *
-	 * @param tuple
-	 */
-	@Override
-	public void processDataTuple(KeyValPair<K, V> tuple)
-	{
-		K key = tuple.getKey();
-		double val = tuple.getValue().doubleValue();
-		ArrayList<SimpleMovingAverageObject> dataList = buffer.get(key);
+  /**
+   * Create the list if key doesn't exist. Add value to buffer and increment
+   * counter.
+   *
+   * @param tuple
+   */
+  @Override
+  public void processDataTuple(KeyValPair<K, V> tuple)
+  {
+    K key = tuple.getKey();
+    double val = tuple.getValue().doubleValue();
+    ArrayList<SimpleMovingAverageObject> dataList = buffer.get(key);
 
-		if (dataList == null) {
-			dataList = new ArrayList<SimpleMovingAverageObject>(windowSize);
-			for (int i = 0; i < windowSize; ++i) {
-				dataList.add(new SimpleMovingAverageObject());
-			}
-		}
+    if (dataList == null) {
+      dataList = new ArrayList<SimpleMovingAverageObject>(windowSize);
+      for (int i = 0; i < windowSize; ++i) {
+        dataList.add(new SimpleMovingAverageObject());
+      }
+    }
 
-		dataList.get(currentstate).add(val); // add to previous value
-		buffer.put(key, dataList);
-	}
+    dataList.get(currentstate).add(val); // add to previous value
+    buffer.put(key, dataList);
+  }
 
-	/**
-	 * Calculate average and emit in appropriate port.
-	 *
-	 * @param key
-	 * @param obj
-	 */
-	@Override
-	public void emitTuple(K key, ArrayList<SimpleMovingAverageObject> obj)
-	{
-		double sum = 0;
-		int count = 0;
-		for (int i = 0; i < windowSize; i++) {
-			SimpleMovingAverageObject d = obj.get(i);
-			sum += d.getSum();
-			count += d.getCount();
-		}
+  /**
+   * Calculate average and emit in appropriate port.
+   *
+   * @param key
+   * @param obj
+   */
+  @Override
+  public void emitTuple(K key, ArrayList<SimpleMovingAverageObject> obj)
+  {
+    double sum = 0;
+    int count = 0;
+    for (int i = 0; i < windowSize; i++) {
+      SimpleMovingAverageObject d = obj.get(i);
+      sum += d.getSum();
+      count += d.getCount();
+    }
 
-		if (count == 0) { // Nothing to emit.
-			return;
-		}
-		if (doubleSMA.isConnected()) {
-			doubleSMA.emit(new KeyValPair<K, Double>(key, (sum / count)));
-		}
-		if (floatSMA.isConnected()) {
-			floatSMA.emit(new KeyValPair<K, Float>(key, (float) (sum / count)));
-		}
-		if (longSMA.isConnected()) {
-			longSMA.emit(new KeyValPair<K, Long>(key, (long) (sum / count)));
-		}
-		if (integerSMA.isConnected()) {
-			integerSMA.emit(new KeyValPair<K, Integer>(key, (int) (sum / count)));
-		}
-	}
+    if (count == 0) { // Nothing to emit.
+      return;
+    }
+    if (doubleSMA.isConnected()) {
+      doubleSMA.emit(new KeyValPair<K, Double>(key, (sum / count)));
+    }
+    if (floatSMA.isConnected()) {
+      floatSMA.emit(new KeyValPair<K, Float>(key, (float)(sum / count)));
+    }
+    if (longSMA.isConnected()) {
+      longSMA.emit(new KeyValPair<K, Long>(key, (long)(sum / count)));
+    }
+    if (integerSMA.isConnected()) {
+      integerSMA.emit(new KeyValPair<K, Integer>(key, (int)(sum / count)));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
index 8d3228b..892c822 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
@@ -28,10 +28,12 @@ import java.util.PriorityQueue;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.commons.lang.ClassUtils;
+
+import com.google.common.base.Function;
+
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.google.common.base.Function;
-import org.apache.commons.lang.ClassUtils;
 
 /**
  *
@@ -102,7 +104,7 @@ public class SortedMovingWindow<T, K> extends AbstractSlidingWindow<T, List<T>>
   {
     super.endWindow();
     tuplesInCurrentStreamWindow = new LinkedList<T>();
-    if(lastExpiredWindowState == null){
+    if (lastExpiredWindowState == null) {
       // not ready to emit value or empty in a certain window
       return;
     }
@@ -115,7 +117,7 @@ public class SortedMovingWindow<T, K> extends AbstractSlidingWindow<T, List<T>>
         int k = 0;
         if (comparator == null) {
           if (expiredTuple instanceof Comparable) {
-            k = ((Comparable<T>) expiredTuple).compareTo(minElemInSortedList);
+            k = ((Comparable<T>)expiredTuple).compareTo(minElemInSortedList);
           } else {
             errorOutput.emit(expiredTuple);
             throw new IllegalArgumentException("Operator \"" + ClassUtils.getShortClassName(this.getClass()) + "\" encounters an invalid tuple " + expiredTuple + "\nNeither the tuple is comparable Nor Comparator is specified!");

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
index 6e2e01f..ed571b4 100644
--- a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
+++ b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
@@ -21,20 +21,24 @@ package com.datatorrent.lib.partitioner;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import javax.validation.constraints.Min;
 
-import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StatsListener;
-
 import com.datatorrent.common.partitioner.StatelessPartitioner;
 
 /**
@@ -122,13 +126,11 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
         response.repartitionRequired = true;
         logger.debug("setting repartition to true");
 
-      }
-      else if (!repartition) {
+      } else if (!repartition) {
         repartition = true;
         nextMillis = System.currentTimeMillis() + cooldownMillis;
       }
-    }
-    else {
+    } else {
       repartition = false;
     }
     return response;
@@ -147,8 +149,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
       nextMillis = partitionNextMillis;
       // delegate to create initial list of partitions
       return new StatelessPartitioner<T>(initialPartitionCount).definePartitions(partitions, context);
-    }
-    else {
+    } else {
       // repartition call
       logger.debug("repartition call for operator");
       if (System.currentTimeMillis() < partitionNextMillis) {
@@ -169,8 +170,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
             Partition<T> siblingPartition = lowLoadPartitions.remove(partitionKey & reducedMask);
             if (siblingPartition == null) {
               lowLoadPartitions.put(partitionKey & reducedMask, p);
-            }
-            else {
+            } else {
               // both of the partitions are low load, combine
               PartitionKeys newPks = new PartitionKeys(reducedMask, Sets.newHashSet(partitionKey & reducedMask));
               // put new value so the map gets marked as modified
@@ -181,8 +181,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
               //LOG.debug("partition keys after merge {}", siblingPartition.getPartitionKeys());
             }
           }
-        }
-        else if (load > 0) {
+        } else if (load > 0) {
           // split bottlenecks
           Map<Operator.InputPort<?>, PartitionKeys> keys = p.getPartitionKeys();
           Map.Entry<Operator.InputPort<?>, PartitionKeys> e = keys.entrySet().iterator().next();
@@ -196,8 +195,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
             int key = e.getValue().partitions.iterator().next();
             int key2 = (newMask ^ e.getValue().mask) | key;
             newKeys = Sets.newHashSet(key, key2);
-          }
-          else {
+          } else {
             // assign keys to separate partitions
             newMask = e.getValue().mask;
             newKeys = e.getValue().partitions;
@@ -208,8 +206,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
             newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
             newPartitions.add(newPartition);
           }
-        }
-        else {
+        } else {
           // leave unchanged
           newPartitions.add(p);
         }
@@ -229,8 +226,8 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
     partitionedInstanceStatus.clear();
     for (Map.Entry<Integer, Partition<T>> entry : partitions.entrySet()) {
       if (partitionedInstanceStatus.containsKey(entry.getKey())) {
-      }
-      else {
+        //FIXME
+      } else {
         partitionedInstanceStatus.put(entry.getKey(), null);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java b/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
index 1c5c96a..4e607f1 100644
--- a/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
@@ -18,14 +18,22 @@
  */
 package com.datatorrent.lib.script;
 
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.HashMap;
 import java.util.Map;
-import javax.script.*;
+
+import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import javax.script.SimpleScriptContext;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+
 /**
  * An implementation of ScriptOperator that executes JavaScript on tuples input for Map &lt;String, Object&gt;.
  *
@@ -46,7 +54,7 @@ import org.slf4j.LoggerFactory;
  *
  * // Connect to output console operator
  * ConsoleOutputOperator console = dag.addOperator(&quot;console&quot;,
- * 		new ConsoleOutputOperator());
+ *   new ConsoleOutputOperator());
  * dag.addStream(&quot;rand_console&quot;, script.result, console.input);
  *
  * </pre>
@@ -54,17 +62,17 @@ import org.slf4j.LoggerFactory;
  * <b> Sample Input Operator(emit)</b>
  *
  * <pre>
- *  	.
- * 		.
- * 		public void emitTuples() {
- * 			HashMap<String, Object> map = new HashMap<String, Object>();
- * 			map.put("val", random.nextInt());
- * 			outport.emit(map);
- * 			.
- * 			.
- * 		}
- * 		.
- * 		.
+ * .
+ * .
+ * public void emitTuples() {
+ *    HashMap<String, Object> map = new HashMap<String, Object>();
+ *    map.put("val", random.nextInt());
+ *    outport.emit(map);
+ *    .
+ *    .
+ * }
+ * .
+ * .
  * </pre>
  *
  * This operator does not checkpoint interpreted functions in the variable bindings because they are not serializable
@@ -80,9 +88,8 @@ public class JavaScriptOperator extends ScriptOperator
 
   public enum Type
   {
-
     EVAL, INVOKE
-  };
+  }
 
   protected transient ScriptEngineManager sem = new ScriptEngineManager();
   protected transient ScriptEngine engine = sem.getEngineByName("JavaScript");
@@ -108,6 +115,8 @@ public class JavaScriptOperator extends ScriptOperator
         case INVOKE:
           evalResult = ((Invocable)engine).invokeFunction(script);
           break;
+        default:
+          //fallthru
       }
 
       if (isPassThru && result.isConnected()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
index f10b04c..9532180 100644
--- a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
@@ -18,16 +18,18 @@
  */
 package com.datatorrent.lib.script;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
 import javax.validation.constraints.NotNull;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * A base implementation of a BaseOperator for language script operator.&nbsp; Subclasses should provide the 
    implementation of getting the bindings and process method. 
@@ -96,5 +98,6 @@ public abstract class ScriptOperator extends BaseOperator
   }
 
   public abstract void process(Map<String, Object> tuple);
+
   public abstract Map<String, Object> getBindings();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java b/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
index ebbe94a..95debb4 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
@@ -21,10 +21,10 @@ package com.datatorrent.lib.statistics;
 import java.util.ArrayList;
 import java.util.Collections;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * An implementation of BaseOperator that computes median of incoming data. <br>
@@ -77,7 +77,9 @@ public class MedianOperator extends BaseOperator
   @Override
   public void endWindow()
   {
-    if (values.size() == 0) return;
+    if (values.size() == 0) {
+      return;
+    }
     if (values.size() == 1) {
       median.emit(values.get(0));
       return;
@@ -86,9 +88,9 @@ public class MedianOperator extends BaseOperator
     // median value
     Collections.sort(values);
     int medianIndex = values.size() / 2;
-    if (values.size() %2 == 0) {
-      Double value = values.get(medianIndex-1);
-      value = (value + values.get(medianIndex))/2;
+    if (values.size() % 2 == 0) {
+      Double value = values.get(medianIndex - 1);
+      value = (value + values.get(medianIndex)) / 2;
       median.emit(value);
     } else {
       median.emit(values.get(medianIndex));

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java b/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
index 84c9c46..f23f7b7 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
@@ -21,10 +21,10 @@ package com.datatorrent.lib.statistics;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * An implementation of BaseOperator that calculates most frequent value occurring in incoming data. <br>
@@ -63,7 +63,7 @@ public class ModeOperator<V extends Comparable<?>> extends BaseOperator
     {
       if (values.containsKey(tuple)) {
         Integer count = values.remove(tuple);
-        values.put(tuple, count+1);
+        values.put(tuple, count + 1);
       } else {
         values.put(tuple, 1);
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java b/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
index 0a47450..8aaf63e 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
@@ -20,11 +20,11 @@ package com.datatorrent.lib.statistics;
 
 import java.util.ArrayList;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * An implementation of BaseOperator that computes variance and standard deviation over incoming data. <br>
@@ -67,7 +67,7 @@ public class StandardDeviation extends BaseOperator
   /**
    * Variance output port.
    */
-  @OutputPortFieldAnnotation(optional=true)
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<Number> variance = new DefaultOutputPort<Number>();
 
   /**
@@ -82,19 +82,21 @@ public class StandardDeviation extends BaseOperator
   public void endWindow()
   {
     // no values.
-    if (values.size() == 0) return;
+    if (values.size() == 0) {
+      return;
+    }
 
     // get mean first.
     double mean = 0.0;
     for (Double value : values) {
       mean += value;
     }
-    mean = mean/values.size();
+    mean = mean / values.size();
 
     // get variance
     double outVal = 0.0;
     for (Double value : values) {
-      outVal += (value-mean)*(value-mean);
+      outVal += (value - mean) * (value - mean);
     }
     outVal = outVal / values.size();
     if (variance.isConnected()) {