You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:04 UTC

[14/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
index 7c9c350..3cace73 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
@@ -27,9 +27,10 @@ import java.util.regex.Pattern;
 
 import javax.validation.constraints.NotNull;
 
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableDouble;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -52,9 +53,10 @@ public class MultiWindowDimensionAggregation implements Operator
   @SuppressWarnings("unused")
   private static final Logger logger = LoggerFactory.getLogger(MultiWindowDimensionAggregation.class);
 
-  public enum AggregateOperation {
+  public enum AggregateOperation
+  {
     SUM, AVERAGE
-  };
+  }
 
   private int windowSize = 2;
   private int currentWindow = 0;
@@ -76,7 +78,8 @@ public class MultiWindowDimensionAggregation implements Operator
   /**
    * This is the input port which receives multi dimensional data.
    */
-  public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>() {
+  public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>()
+  {
     @Override
     public void process(Map<String, Map<String, Number>> tuple)
     {
@@ -169,12 +172,15 @@ public class MultiWindowDimensionAggregation implements Operator
   @Override
   public void setup(OperatorContext arg0)
   {
-    if (arg0 != null)
+    if (arg0 != null) {
       applicationWindowSize = arg0.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
-    if (cacheOject == null)
+    }
+    if (cacheOject == null) {
       cacheOject = new HashMap<Integer, Map<String, Map<String, Number>>>(windowSize);
-    if (outputMap == null)
+    }
+    if (outputMap == null) {
       outputMap = new HashMap<String, Map<String, KeyValPair<MutableDouble, Integer>>>();
+    }
     setUpPatternList();
 
   }
@@ -238,8 +244,9 @@ public class MultiWindowDimensionAggregation implements Operator
       }
     }
     currentWindowMap.clear();
-    if (patternList == null || patternList.isEmpty())
+    if (patternList == null || patternList.isEmpty()) {
       setUpPatternList();
+    }
 
   }
 
@@ -255,12 +262,13 @@ public class MultiWindowDimensionAggregation implements Operator
           outputData.put(e.getKey(), new DimensionObject<String>(keyVal.getKey(), dimensionValObj.getKey()));
         } else if (operationType == AggregateOperation.AVERAGE) {
           if (keyVal.getValue() != 0) {
-            double totalCount = ((double) (totalWindowsOccupied * applicationWindowSize)) / 1000;
+            double totalCount = ((double)(totalWindowsOccupied * applicationWindowSize)) / 1000;
             outputData.put(e.getKey(), new DimensionObject<String>(new MutableDouble(keyVal.getKey().doubleValue() / totalCount), dimensionValObj.getKey()));
           }
         }
-        if (!outputData.isEmpty())
+        if (!outputData.isEmpty()) {
           output.emit(outputData);
+        }
       }
     }
     currentWindow = (currentWindow + 1) % windowSize;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
index d64c634..e469b0c 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
@@ -20,16 +20,19 @@ package com.datatorrent.lib.logs;
 
 import java.util.HashMap;
 import java.util.Map;
-import com.google.code.regexp.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.code.regexp.Matcher;
-import com.datatorrent.common.util.BaseOperator;
+import com.google.code.regexp.Pattern;
+
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator parses unstructured log data into named fields.
@@ -77,7 +80,7 @@ import org.slf4j.LoggerFactory;
  * @since 1.0.5
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class RegexMatchMapOperator extends BaseOperator
 {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
index 7f903fc..5f09a4b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.api.DefaultInputPort;
-
 import java.util.Collection;
 
+import com.datatorrent.api.DefaultInputPort;
+
 /**
  * Aggregates input tuples that are collections of longs and double and emits result on four ports.
  * <p>
@@ -49,54 +49,52 @@ import java.util.Collection;
  * @since 0.3.3
  */
 public abstract class AbstractAggregateCalc<T extends Number> extends
-		AbstractOutput
+    AbstractOutput
 {
-	/**
-	 * Input port, accepts collection of values of type 'T'.
-	 */
-	public final transient DefaultInputPort<Collection<T>> input = new DefaultInputPort<Collection<T>>()
-	{
-		/**
-		 * Aggregate calculation result is only emitted on output port if it is connected.
-		 */
-		@Override
-		public void process(Collection<T> collection)
-		{
-			Double dResult = null;
-			if (doubleResult.isConnected()) {
-				doubleResult.emit(dResult = aggregateDoubles(collection));
-			}
+  /**
+   * Input port, accepts collection of values of type 'T'.
+   */
+  public final transient DefaultInputPort<Collection<T>> input = new DefaultInputPort<Collection<T>>()
+  {
+    /**
+     * Aggregate calculation result is only emitted on output port if it is connected.
+     */
+    @Override
+    public void process(Collection<T> collection)
+    {
+      Double dResult = null;
+      if (doubleResult.isConnected()) {
+        doubleResult.emit(dResult = aggregateDoubles(collection));
+      }
 
-			if (floatResult.isConnected()) {
-				floatResult
-						.emit(dResult == null ? (float) (aggregateDoubles(collection))
-								: dResult.floatValue());
-			}
+      if (floatResult.isConnected()) {
+        floatResult.emit(dResult == null ? (float)(aggregateDoubles(collection)) : dResult.floatValue());
+      }
 
-			Long lResult = null;
-			if (longResult.isConnected()) {
-				longResult.emit(lResult = aggregateLongs(collection));
-			}
+      Long lResult = null;
+      if (longResult.isConnected()) {
+        longResult.emit(lResult = aggregateLongs(collection));
+      }
 
-			if (integerResult.isConnected()) {
-				integerResult.emit(lResult == null ? (int) aggregateLongs(collection)
-						: lResult.intValue());
-			}
-		}
+      if (integerResult.isConnected()) {
+        integerResult.emit(lResult == null ? (int)aggregateLongs(collection)
+            : lResult.intValue());
+      }
+    }
 
-	};
+  };
 
-	/**
-	 * Abstract function to be implemented by sub class, custom calculation on input aggregate.
-	 * @param collection Aggregate of values 
-	 * @return calculated value.
-	 */
-	public abstract long aggregateLongs(Collection<T> collection);
+  /**
+   * Abstract function to be implemented by sub class, custom calculation on input aggregate.
+   * @param collection Aggregate of values
+   * @return calculated value.
+   */
+  public abstract long aggregateLongs(Collection<T> collection);
 
-	/**
-	 * Abstract function to be implemented by sub class, custom calculation on input aggregate.
-	 * @param collection Aggregate of values 
-	 * @return calculated value.
-	 */
-	public abstract double aggregateDoubles(Collection<T> collection);
+  /**
+   * Abstract function to be implemented by sub class, custom calculation on input aggregate.
+   * @param collection Aggregate of values
+   * @return calculated value.
+   */
+  public abstract double aggregateDoubles(Collection<T> collection);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
index bb387b4..9600021 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
@@ -18,9 +18,9 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Abstract base operator defining optional double/float/long/integer output port.
@@ -34,27 +34,27 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
  */
 public abstract class AbstractOutput extends BaseOperator
 {
-	/**
-	 * Double type output.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Double> doubleResult = new DefaultOutputPort<Double>();
+  /**
+   * Double type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Double> doubleResult = new DefaultOutputPort<Double>();
 
-	/**
-	 * Float type output.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Float> floatResult = new DefaultOutputPort<Float>();
+  /**
+   * Float type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Float> floatResult = new DefaultOutputPort<Float>();
 
-	/**
-	 * Long type output.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Long> longResult = new DefaultOutputPort<Long>();
+  /**
+   * Long type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Long> longResult = new DefaultOutputPort<Long>();
 
-	/**
-	 * Integer type output.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Integer> integerResult = new DefaultOutputPort<Integer>();
+  /**
+   * Integer type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> integerResult = new DefaultOutputPort<Integer>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
index a6b7ab2..10b6c15 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
@@ -18,17 +18,23 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.lib.xml.AbstractXmlDOMOperator;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
 import org.w3c.dom.Document;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
-import javax.validation.constraints.NotNull;
-import javax.xml.xpath.*;
-import java.util.ArrayList;
-import java.util.List;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.xml.AbstractXmlDOMOperator;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * An operator that performs a cartesian product between different elements in a xml document.
@@ -146,7 +152,7 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
     try {
       List<String> result = new ArrayList<String>();
       for (CartesianProduct cartesianProduct : cartesianProducts) {
-          cartesianProduct.product(document, result);
+        cartesianProduct.product(document, result);
       }
       processResult(result, tuple);
     } catch (XPathExpressionException e) {
@@ -252,8 +258,11 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
           int balance = 1;
           int i;
           for (i = 1; (i < spec.length()) && (balance > 0); ++i) {
-            if (spec.charAt(i) == ')') balance--;
-            else if (spec.charAt(i) == '(') balance++;
+            if (spec.charAt(i) == ')') {
+              balance--;
+            } else if (spec.charAt(i) == '(') {
+              balance++;
+            }
           }
           if (i == spec.length()) {
             estr = spec.substring(1, spec.length() - 1);
@@ -358,10 +367,10 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
             int chldEdDelIdx = productSpec.length() - 1;
             int chldSepDelIdx;
             if ((productSpec.charAt(chldStDelIdx) == '(') && (productSpec.charAt(chldEdDelIdx) == ')')
-                    && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) {
+                && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) {
               String child1Spec = productSpec.substring(chldStDelIdx + 1, chldSepDelIdx);
               String child2Spec = productSpec.substring(chldSepDelIdx + 1, chldEdDelIdx);
-              parentElement = (SimplePathElement) pathElement;
+              parentElement = (SimplePathElement)pathElement;
               childElement1 = pathElementFactory.getSpecable(child1Spec);
               childElement2 = pathElementFactory.getSpecable(child2Spec);
             }
@@ -419,7 +428,7 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
   private List<Node> getNodes(Document document, String path) throws XPathExpressionException
   {
     XPathExpression pathExpr = xpath.compile(path);
-    NodeList nodeList = (NodeList) pathExpr.evaluate(document, XPathConstants.NODESET);
+    NodeList nodeList = (NodeList)pathExpr.evaluate(document, XPathConstants.NODESET);
     List<Node> nodes = new ArrayList<Node>();
     for (int i = 0; i < nodeList.getLength(); ++i) {
       nodes.add(nodeList.item(i));
@@ -459,8 +468,11 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
     String delim = getDelim();
     boolean first = true;
     for (Node node : nodes) {
-      if (!first) sb.append(delim);
-      else first = false;
+      if (!first) {
+        sb.append(delim);
+      } else {
+        first = false;
+      }
       sb.append(getValue(node));
     }
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
index 03c98d3..91cc9ba 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
@@ -39,7 +39,8 @@ public abstract class AbstractXmlKeyValueCartesianProduct<T> extends AbstractXml
   }
 
   @Override
-  public boolean isValueNode(Node n) {
+  public boolean isValueNode(Node n)
+  {
     return isTextContainerNode(n);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Average.java b/library/src/main/java/com/datatorrent/lib/math/Average.java
index d956e05..4dfdf1f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Average.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Average.java
@@ -44,77 +44,77 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
  */
 public class Average<V extends Number> extends BaseNumberValueOperator<V>
 {
-	/**
-	 * Input port that takes a number.
-	 */
-	public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-	{
-		/**
-		 * Computes sum and count with each tuple
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			sums += tuple.doubleValue();
-			counts++;
-		}
-	};
+  /**
+   * Input port that takes a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Computes sum and count with each tuple
+     */
+    @Override
+    public void process(V tuple)
+    {
+      sums += tuple.doubleValue();
+      counts++;
+    }
+  };
 
-	/**
-	 * Output port that emits average as a number.
-	 */
-	public final transient DefaultOutputPort<V> average = new DefaultOutputPort<V>();
+  /**
+   * Output port that emits average as a number.
+   */
+  public final transient DefaultOutputPort<V> average = new DefaultOutputPort<V>();
 
-	protected double sums = 0;
-	protected long counts = 0;
+  protected double sums = 0;
+  protected long counts = 0;
 
-	/**
-	 * Emit average.
-	 */
-	@Override
-	public void endWindow()
-	{
-		// May want to send out only if count != 0
-		if (counts != 0) {
-			average.emit(getAverage());
-		}
-		sums = 0;
-		counts = 0;
-	}
+  /**
+   * Emit average.
+   */
+  @Override
+  public void endWindow()
+  {
+    // May want to send out only if count != 0
+    if (counts != 0) {
+      average.emit(getAverage());
+    }
+    sums = 0;
+    counts = 0;
+  }
 
-	/**
-	 * Calculate average based on number type.
-	 */
-	@SuppressWarnings("unchecked")
-	public V getAverage()
-	{
-		if (counts == 0) {
-			return null;
-		}
-		V num = getValue(sums);
-		Number val;
-		switch (getType()) {
-			case DOUBLE:
-				val = new Double(num.doubleValue() / counts);
-				break;
-			case INTEGER:
-				int icount = (int) (num.intValue() / counts);
-				val = new Integer(icount);
-				break;
-			case FLOAT:
-				val = new Float(num.floatValue() / counts);
-				break;
-			case LONG:
-				val = new Long(num.longValue() / counts);
-				break;
-			case SHORT:
-				short scount = (short) (num.shortValue() / counts);
-				val = new Short(scount);
-				break;
-			default:
-				val = new Double(num.doubleValue() / counts);
-				break;
-		}
-		return (V) val;
-	}
+  /**
+   * Calculate average based on number type.
+   */
+  @SuppressWarnings("unchecked")
+  public V getAverage()
+  {
+    if (counts == 0) {
+      return null;
+    }
+    V num = getValue(sums);
+    Number val;
+    switch (getType()) {
+      case DOUBLE:
+        val = new Double(num.doubleValue() / counts);
+        break;
+      case INTEGER:
+        int icount = (int)(num.intValue() / counts);
+        val = new Integer(icount);
+        break;
+      case FLOAT:
+        val = new Float(num.floatValue() / counts);
+        break;
+      case LONG:
+        val = new Long(num.longValue() / counts);
+        break;
+      case SHORT:
+        short scount = (short)(num.shortValue() / counts);
+        val = new Short(scount);
+        break;
+      default:
+        val = new Double(num.doubleValue() / counts);
+        break;
+    }
+    return (V)val;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
index e9e7e40..e443780 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableLong;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
 import com.datatorrent.lib.util.KeyValPair;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableLong;
 
 /**
  *
@@ -57,84 +59,87 @@ import org.apache.commons.lang.mutable.MutableLong;
  */
 public class AverageKeyVal<K> extends BaseNumberKeyValueOperator<K, Number>
 {
-	// Aggregate sum of all values seen for a key.
-	protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
-	
-	// Count of number of values seen for key.
-	protected HashMap<K, MutableLong> counts = new HashMap<K, MutableLong>();
-	
-	/**
-	 * Input port that takes a key value pair.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>()
-	{
-		/**
-		 * Adds the values for each key, counts the number of occurrences of each
-		 * key and computes the average.
-		 */
-		@Override
-		public void process(KeyValPair<K, ? extends Number> tuple)
-		{
-			K key = tuple.getKey();
-			if (!doprocessKey(key)) {
-				return;
-			}
-			MutableDouble val = sums.get(key);
-			if (val == null) {
-				val = new MutableDouble(tuple.getValue().doubleValue());
-			} else {
-				val.add(tuple.getValue().doubleValue());
-			}
-			sums.put(cloneKey(key), val);
+  // Aggregate sum of all values seen for a key.
+  protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+
+  // Count of number of values seen for key.
+  protected HashMap<K, MutableLong> counts = new HashMap<K, MutableLong>();
+
+  /**
+   * Input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>()
+  {
+    /**
+     * Adds the values for each key, counts the number of occurrences of each
+     * key and computes the average.
+     */
+    @Override
+    public void process(KeyValPair<K, ? extends Number> tuple)
+    {
+      K key = tuple.getKey();
+      if (!doprocessKey(key)) {
+        return;
+      }
+      MutableDouble val = sums.get(key);
+      if (val == null) {
+        val = new MutableDouble(tuple.getValue().doubleValue());
+      } else {
+        val.add(tuple.getValue().doubleValue());
+      }
+      sums.put(cloneKey(key), val);
+
+      MutableLong count = counts.get(key);
+      if (count == null) {
+        count = new MutableLong(0);
+        counts.put(cloneKey(key), count);
+      }
+      count.increment();
+    }
+  };
+
+  /**
+   * Double average output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage =
+      new DefaultOutputPort<KeyValPair<K, Double>>();
 
-			MutableLong count = counts.get(key);
-			if (count == null) {
-				count = new MutableLong(0);
-				counts.put(cloneKey(key), count);
-			}
-			count.increment();
-		}
-	};
+  /**
+   * Integer average output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage =
+      new DefaultOutputPort<KeyValPair<K, Integer>>();
 
-	/**
-	 * Double average output port. 
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage = new DefaultOutputPort<KeyValPair<K, Double>>();
-	
-	/**
-	 * Integer average output port. 
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage = new DefaultOutputPort<KeyValPair<K, Integer>>();
-	
-	/**
-	 * Long average output port. 
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage = new DefaultOutputPort<KeyValPair<K, Long>>();
+  /**
+   * Long average output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage =
+      new DefaultOutputPort<KeyValPair<K, Long>>();
 
-	/**
-	 * Emits average for each key in end window. Data is computed during process
-	 * on input port Clears the internal data before return.
-	 */
-	@Override
-	public void endWindow()
-	{
-		for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
-			K key = e.getKey();
-			double d = e.getValue().doubleValue();
-			if (doubleAverage.isConnected()) {
-				doubleAverage.emit(new KeyValPair<K, Double>(key, d / counts.get(key).doubleValue()));
-			}
-			if (intAverage.isConnected()) {
-				intAverage.emit(new KeyValPair<K, Integer>(key, (int) d));
-			}
-			if (longAverage.isConnected()) {
-				longAverage.emit(new KeyValPair<K, Long>(key, (long) d));
-			}
-		}
-		sums.clear();
-		counts.clear();
-	}
+  /**
+   * Emits average for each key in end window. Data is computed during process
+   * on input port Clears the internal data before return.
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
+      K key = e.getKey();
+      double d = e.getValue().doubleValue();
+      if (doubleAverage.isConnected()) {
+        doubleAverage.emit(new KeyValPair<K, Double>(key, d / counts.get(key).doubleValue()));
+      }
+      if (intAverage.isConnected()) {
+        intAverage.emit(new KeyValPair<K, Integer>(key, (int)d));
+      }
+      if (longAverage.isConnected()) {
+        longAverage.emit(new KeyValPair<K, Long>(key, (long)d));
+      }
+    }
+    sums.clear();
+    counts.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Change.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Change.java b/library/src/main/java/com/datatorrent/lib/math/Change.java
index 628839f..57bad6b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Change.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Change.java
@@ -61,57 +61,57 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
 public class Change<V extends Number> extends BaseNumberValueOperator<V>
 {
         /**
-	 * Input data port that takes a number.
-	 */
-	public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-	{
-		/**
-		 * Process each key, compute change or percent, and emit it.
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple?
-				double cval = tuple.doubleValue() - baseValue;
-				change.emit(getValue(cval));
-				percent.emit((cval / baseValue) * 100);
-			}
-		}
-	};
+   * Input data port that takes a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple?
+        double cval = tuple.doubleValue() - baseValue;
+        change.emit(getValue(cval));
+        percent.emit((cval / baseValue) * 100);
+      }
+    }
+  };
         
         /**
-	 * Input port that takes a number&nbsp; It stores the value for base comparison. 
-	 */
-	public final transient DefaultInputPort<V> base = new DefaultInputPort<V>()
-	{
-		/**
-		 * Process each key to store the value. If same key appears again update
-		 * with latest value.
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error
-																				// tuple?
-				baseValue = tuple.doubleValue();
-			}
-		}
-	};
-	
-	/**
-	 * Output port that emits change in value compared to base value.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>();
-	
-	/**
-	 * Output port that emits percent change in data value compared to base value.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>();
-	
-	/**
-	 * baseValue is a state full field. It is retained across windows.
-	 */
-	private double baseValue = 0;
+   * Input port that takes a number&nbsp; It stores the value for base comparison.
+   */
+  public final transient DefaultInputPort<V> base = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key to store the value. If same key appears again update
+     * with latest value.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error
+                                        // tuple?
+        baseValue = tuple.doubleValue();
+      }
+    }
+  };
+
+  /**
+   * Output port that emits change in value compared to base value.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>();
+
+  /**
+   * Output port that emits percent change in data value compared to base value.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>();
+
+  /**
+   * baseValue is a state full field. It is retained across windows.
+   */
+  private double baseValue = 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
index 01a040d..3c48016 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
@@ -53,66 +53,66 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 public class ChangeAlert<V extends Number> extends BaseNumberValueOperator<V>
 {
-	/**
-	 * Input port that takes in a number.
-	 */
-	public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-	{
-		/**
-		 * Process each key, compute change or percent, and emit it. If we get 0 as
-		 * tuple next will be skipped.
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			double tval = tuple.doubleValue();
-			if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple?
-				baseValue = tval;
-				return;
-			}
-			double change = tval - baseValue;
-			double percent = (change / baseValue) * 100;
-			if (percent < 0.0) {
-				percent = 0.0 - percent;
-			}
-			if (percent > percentThreshold) {
-				KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple),
-						percent);
-				alert.emit(kv);
-			}
-			baseValue = tval;
-		}
-	};
+  /**
+   * Input port that takes in a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it. If we get 0 as
+     * tuple next will be skipped.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      double tval = tuple.doubleValue();
+      if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple?
+        baseValue = tval;
+        return;
+      }
+      double change = tval - baseValue;
+      double percent = (change / baseValue) * 100;
+      if (percent < 0.0) {
+        percent = 0.0 - percent;
+      }
+      if (percent > percentThreshold) {
+        KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple),
+            percent);
+        alert.emit(kv);
+      }
+      baseValue = tval;
+    }
+  };
 
 
-	/**
-	 * Output port which emits a key value pair.
-	 */
-	public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>();
+  /**
+   * Output port which emits a key value pair.
+   */
+  public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>();
 
-	/**
-	 * baseValue is a state full field. It is retained across windows
-	 */
-	private double baseValue = 0;
-	@Min(1)
-	private double percentThreshold = 0.0;
+  /**
+   * baseValue is a state full field. It is retained across windows
+   */
+  private double baseValue = 0;
+  @Min(1)
+  private double percentThreshold = 0.0;
 
-	/**
-	 * getter function for threshold value
-	 *
-	 * @return threshold value
-	 */
-	@Min(1)
-	public double getPercentThreshold()
-	{
-		return percentThreshold;
-	}
+  /**
+   * getter function for threshold value
+   *
+   * @return threshold value
+   */
+  @Min(1)
+  public double getPercentThreshold()
+  {
+    return percentThreshold;
+  }
 
-	/**
-	 * setter function for threshold value
-	 */
-	public void setPercentThreshold(double d)
-	{
-		percentThreshold = d;
-	}
+  /**
+   * setter function for threshold value
+   */
+  public void setPercentThreshold(double d)
+  {
+    percentThreshold = d;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
index 43e098f..b0d2e77 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
@@ -52,78 +52,78 @@ import com.datatorrent.lib.util.KeyValPair;
  * @since 0.3.3
  */
 public class ChangeAlertKeyVal<K, V extends Number> extends
-		BaseNumberKeyValueOperator<K, V>
+    BaseNumberKeyValueOperator<K, V>
 {
-	/**
-	 * Base map is a StateFull field. It is retained across windows
-	 */
-	private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
+  /**
+   * Base map is a StateFull field. It is retained across windows
+   */
+  private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
 
-	/**
-	 * Input data port that takes a key value pair.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
-	{
-		/**
-		 * Process each key, compute change or percent, and emit it.
-		 */
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			K key = tuple.getKey();
-			double tval = tuple.getValue().doubleValue();
-			MutableDouble val = basemap.get(key);
-			if (!doprocessKey(key)) {
-				return;
-			}
-			if (val == null) { // Only process keys that are in the basemap
-				val = new MutableDouble(tval);
-				basemap.put(cloneKey(key), val);
-				return;
-			}
-			double change = tval - val.doubleValue();
-			double percent = (change / val.doubleValue()) * 100;
-			if (percent < 0.0) {
-				percent = 0.0 - percent;
-			}
-			if (percent > percentThreshold) {
-				KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
-						cloneValue(tuple.getValue()), percent);
-				KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>(
-						cloneKey(key), dmap);
-				alert.emit(otuple);
-			}
-			val.setValue(tval);
-		}
-	};
+  /**
+   * Input data port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      double tval = tuple.getValue().doubleValue();
+      MutableDouble val = basemap.get(key);
+      if (!doprocessKey(key)) {
+        return;
+      }
+      if (val == null) { // Only process keys that are in the basemap
+        val = new MutableDouble(tval);
+        basemap.put(cloneKey(key), val);
+        return;
+      }
+      double change = tval - val.doubleValue();
+      double percent = (change / val.doubleValue()) * 100;
+      if (percent < 0.0) {
+        percent = 0.0 - percent;
+      }
+      if (percent > percentThreshold) {
+        KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
+            cloneValue(tuple.getValue()), percent);
+        KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>(
+            cloneKey(key), dmap);
+        alert.emit(otuple);
+      }
+      val.setValue(tval);
+    }
+  };
 
-	/**
-	 * Key,Percent Change output port.
-	 */
-	public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
+  /**
+   * Key,Percent Change output port.
+   */
+  public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
 
-	/**
-	 * Alert thresh hold percentage set by application.
-	 */
-	@Min(1)
-	private double percentThreshold = 0.0;
+  /**
+   * Alert thresh hold percentage set by application.
+   */
+  @Min(1)
+  private double percentThreshold = 0.0;
 
-	/**
-	 * getter function for threshold value
-	 *
-	 * @return threshold value
-	 */
-	@Min(1)
-	public double getPercentThreshold()
-	{
-		return percentThreshold;
-	}
+  /**
+   * getter function for threshold value
+   *
+   * @return threshold value
+   */
+  @Min(1)
+  public double getPercentThreshold()
+  {
+    return percentThreshold;
+  }
 
-	/**
-	 * setter function for threshold value
-	 */
-	public void setPercentThreshold(double d)
-	{
-		percentThreshold = d;
-	}
+  /**
+   * setter function for threshold value
+   */
+  public void setPercentThreshold(double d)
+  {
+    percentThreshold = d;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
index ebf16d1..e212a2d 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
@@ -74,7 +74,7 @@ public class ChangeAlertMap<K, V extends Number> extends BaseNumberKeyValueOpera
           continue;
         }
         double change = e.getValue().doubleValue() - val.doubleValue();
-        double percent = (change/val.doubleValue())*100;
+        double percent = (change / val.doubleValue()) * 100;
         if (percent < 0.0) {
           percent = 0.0 - percent;
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
index 2e406ec..3f77052 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
@@ -54,8 +54,7 @@ import com.datatorrent.lib.util.KeyValPair;
  * @tags change, key value
  * @since 0.3.3
  */
-public class ChangeKeyVal<K, V extends Number> extends
-  BaseNumberKeyValueOperator<K, V>
+public class ChangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
 {
   /**
    * basemap is a stateful field. It is retained across windows
@@ -81,8 +80,7 @@ public class ChangeKeyVal<K, V extends Number> extends
       if (bval != null) { // Only process keys that are in the basemap
         double cval = tuple.getValue().doubleValue() - bval.doubleValue();
         change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval)));
-        percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval
-          .doubleValue()) * 100));
+        percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100));
       }
     }
   };

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
index 0573e3e..66bd7da 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.algo.MatchMap;
 import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Operator compares based on the property "key", "value", and "compare".
@@ -86,13 +87,13 @@ public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V>
   /**
    * Output port that emits a hashmap of matched tuples after comparison.
    */
-  @OutputPortFieldAnnotation(optional=true)
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
   
   /**
    * Output port that emits a hashmap of non matching tuples after comparison.
    */
-  @OutputPortFieldAnnotation(optional=true)
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
   {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
index 540f756..3636207 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
@@ -18,10 +18,11 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.algo.MatchMap;
-import java.util.HashMap;
 
 /**
  * This operator compares tuples subclassed from Number based on the property "key", "value", and "cmp", and matching tuples are emitted.
@@ -78,8 +79,8 @@ import java.util.HashMap;
 @Stateless
 public class CompareMap<K, V extends Number> extends MatchMap<K,V>
 {
-    /**
-     * Output port that emits a hashmap of matching number tuples after comparison.
-     */
-    public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
+  /**
+   * Output port that emits a hashmap of matching number tuples after comparison.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
index 64c5029..d593020 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
@@ -50,65 +50,65 @@ import com.datatorrent.lib.util.UnifierCountOccurKey;
 public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V>
 {
 
-	/**
-	 * Key occurrence count map.
-	 */
-	protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
+  /**
+   * Key occurrence count map.
+   */
+  protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
 
-	/**
-	 * Input data port that takes key value pair.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
-	{
-		/**
-		 * For each tuple (a key value pair): Adds the values for each key, Counts
-		 * the number of occurrence of each key
-		 */
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			K key = tuple.getKey();
-			MutableInt count = counts.get(key);
-			if (count == null) {
-				count = new MutableInt(0);
-				counts.put(cloneKey(key), count);
-			}
-			count.increment();
-		}
+  /**
+   * Input data port that takes key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * For each tuple (a key value pair): Adds the values for each key, Counts
+     * the number of occurrence of each key
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      MutableInt count = counts.get(key);
+      if (count == null) {
+        count = new MutableInt(0);
+        counts.put(cloneKey(key), count);
+      }
+      count.increment();
+    }
 
-		@Override
-		public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-		{
-			return getKeyValPairStreamCodec();
-		}
-	};
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
 
-	/**
-	 * Key, occurrence value pair output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
-	{
-		@Override
-		public UnifierCountOccurKey<K> getUnifier()
-		{
-			return new UnifierCountOccurKey<K>();
-		}
-	};
+  /**
+   * Key, occurrence value pair output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
+  {
+    @Override
+    public UnifierCountOccurKey<K> getUnifier()
+    {
+      return new UnifierCountOccurKey<K>();
+    }
+  };
 
-	/**
-	 * Emits on all ports that are connected. Data is computed during process on
-	 * input port and endWindow just emits it for each key. Clears the internal
-	 * data if resetAtEndWindow is true.
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Override
-	public void endWindow()
-	{
-		for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
-			count.emit(new KeyValPair(e.getKey(),
-					new Integer(e.getValue().intValue())));
-		}
-		counts.clear();
-	}
+  /**
+   * Emits on all ports that are connected. Data is computed during process on
+   * input port and endWindow just emits it for each key. Clears the internal
+   * data if resetAtEndWindow is true.
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
+      count.emit(new KeyValPair(e.getKey(),
+          new Integer(e.getValue().intValue())));
+    }
+    counts.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Division.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Division.java b/library/src/main/java/com/datatorrent/lib/math/Division.java
index 5bbb9a9..d05af18 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Division.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Division.java
@@ -20,10 +20,10 @@ package com.datatorrent.lib.math;
 
 import java.util.ArrayList;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator does division metric on consecutive tuples on ports.
@@ -54,9 +54,9 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
  */
 public class Division extends BaseOperator
 {
-	/**
-	 * Array to store numerator inputs during window.
-	 */
+  /**
+   * Array to store numerator inputs during window.
+   */
   private ArrayList<Number> numer = new ArrayList<Number>();
   
   /**
@@ -83,7 +83,7 @@ public class Division extends BaseOperator
         if (loc > numer.size()) {
           loc = numer.size();
         }
-        emit(numer.get(loc-1), denom.get(loc-1));
+        emit(numer.get(loc - 1), denom.get(loc - 1));
         index++;
       }
     }
@@ -107,7 +107,7 @@ public class Division extends BaseOperator
         if (loc > numer.size()) {
           loc = numer.size();
         }
-        emit(numer.get(loc-1), denom.get(loc-1));
+        emit(numer.get(loc - 1), denom.get(loc - 1));
         index++;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
index 84b10b8..ddef880 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
@@ -68,35 +68,35 @@ public class ExceptMap<K, V extends Number> extends MatchMap<K, V>
         /**
          * Output port that emits non matching number tuples.
          */
-	public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
-	{
-		@Override
-		public Unifier<HashMap<K, V>> getUnifier()
-		{
-			return new UnifierHashMap<K, V>();
-		}
-	};
+  public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMap<K, V>();
+    }
+  };
 
-	/**
-	 * Does nothing. Overrides base as call super.tupleMatched() would emit the
-	 * tuple
-	 * 
-	 * @param tuple
-	 */
-	@Override
-	public void tupleMatched(Map<K, V> tuple)
-	{
-	}
+  /**
+   * Does nothing. Overrides base as call super.tupleMatched() would emit the
+   * tuple
+   *
+   * @param tuple
+   */
+  @Override
+  public void tupleMatched(Map<K, V> tuple)
+  {
+  }
 
-	/**
-	 * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override
-	 * in case objects are mutable
-	 * 
-	 * @param tuple
-	 */
-	@Override
-	public void tupleNotMatched(Map<K, V> tuple)
-	{
-		except.emit(cloneTuple(tuple));
-	}
+  /**
+   * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override
+   * in case objects are mutable
+   *
+   * @param tuple
+   */
+  @Override
+  public void tupleNotMatched(Map<K, V> tuple)
+  {
+    except.emit(cloneTuple(tuple));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
index 0b7e036..41ce5a0 100644
--- a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
+++ b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.Pair;
 
 /**
@@ -29,7 +29,7 @@ import com.datatorrent.common.util.Pair;
  * <p>
  * If the first value is equal to second value, then the pair is emitted on equalTo, greaterThanEqualTo, and lessThanEqualTo ports.
  * If the first value is less than second value, then the pair is emitted on notEqualTo, lessThan and lessThanEqualTo ports.
- * If the first value is greater than second value, then the pair is emitted on notEqualTo, greaterThan and greaterThanEqualTo ports. 
+ * If the first value is greater than second value, then the pair is emitted on notEqualTo, greaterThan and greaterThanEqualTo ports.
  * This is a pass through operator.
  * <br>
  * StateFull : No, output is computed during current window. <br>
@@ -51,61 +51,61 @@ import com.datatorrent.common.util.Pair;
  */
 @Stateless
 public abstract class LogicalCompare<T extends Comparable<? super T>> extends
-		BaseOperator
+    BaseOperator
 {
-	/**
-	 * Input port that takes a key, value pair for comparison.
-	 */
-	public final transient DefaultInputPort<Pair<T, T>> input = new DefaultInputPort<Pair<T, T>>()
-	{
-		@Override
-		public void process(Pair<T, T> tuple)
-		{
-			int i = tuple.first.compareTo(tuple.second);
-			if (i > 0) {
-				greaterThan.emit(tuple);
-				greaterThanOrEqualTo.emit(tuple);
-				notEqualTo.emit(tuple);
-			} else if (i < 0) {
-				lessThan.emit(tuple);
-				lessThanOrEqualTo.emit(tuple);
-				notEqualTo.emit(tuple);
-			} else {
-				equalTo.emit(tuple);
-				lessThanOrEqualTo.emit(tuple);
-				greaterThanOrEqualTo.emit(tuple);
-			}
-		}
+  /**
+   * Input port that takes a key, value pair for comparison.
+   */
+  public final transient DefaultInputPort<Pair<T, T>> input = new DefaultInputPort<Pair<T, T>>()
+  {
+    @Override
+    public void process(Pair<T, T> tuple)
+    {
+      int i = tuple.first.compareTo(tuple.second);
+      if (i > 0) {
+        greaterThan.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else if (i < 0) {
+        lessThan.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else {
+        equalTo.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+      }
+    }
 
-	};
+  };
 
-	/**
-	 * Equal output port.
-	 */
-	public final transient DefaultOutputPort<Pair<T, T>> equalTo = new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Equal output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> equalTo = new DefaultOutputPort<Pair<T, T>>();
 
-	/**
-	 * Not Equal output port.
-	 */
-	public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Not Equal output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new DefaultOutputPort<Pair<T, T>>();
 
-	/**
-	 * Less than output port.
-	 */
-	public final transient DefaultOutputPort<Pair<T, T>> lessThan = new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Less than output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> lessThan = new DefaultOutputPort<Pair<T, T>>();
 
-	/**
-	 * Greater than output port.
-	 */
-	public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Greater than output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new DefaultOutputPort<Pair<T, T>>();
 
-	/**
-	 * Less than equal to output port.
-	 */
-	public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Less than equal to output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
 
-	/**
-	 * Greater than equal to output port.
-	 */
-	public final transient DefaultOutputPort<Pair<T, T>> greaterThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Greater than equal to output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> greaterThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
index 5e98eae..659a287 100644
--- a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
+++ b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator does a logical comparison of a constant with a tuple.
@@ -54,78 +54,78 @@ import com.datatorrent.api.annotation.Stateless;
  */
 @Stateless
 public class LogicalCompareToConstant<T extends Comparable<? super T>> extends
-		BaseOperator
+    BaseOperator
 {
 
-	/**
-	 * Compare constant, set by application.
-	 */
-	private T constant;
+  /**
+   * Compare constant, set by application.
+   */
+  private T constant;
 
-	/**
-	 * Input port that takes a comparable to compare it with a constant.
-	 */
-	public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
-	{
-		@Override
-		public void process(T tuple)
-		{
-			int i = constant.compareTo(tuple);
-			if (i > 0) {
-				greaterThan.emit(tuple);
-				greaterThanOrEqualTo.emit(tuple);
-				notEqualTo.emit(tuple);
-			} else if (i < 0) {
-				lessThan.emit(tuple);
-				lessThanOrEqualTo.emit(tuple);
-				notEqualTo.emit(tuple);
-			} else {
-				equalTo.emit(tuple);
-				lessThanOrEqualTo.emit(tuple);
-				greaterThanOrEqualTo.emit(tuple);
-			}
-		}
+  /**
+   * Input port that takes a comparable to compare it with a constant.
+   */
+  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      int i = constant.compareTo(tuple);
+      if (i > 0) {
+        greaterThan.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else if (i < 0) {
+        lessThan.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else {
+        equalTo.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+      }
+    }
 
-	};
+  };
 
-	/**
-	 * Equal output port.
-	 */
-	public final transient DefaultOutputPort<T> equalTo = new DefaultOutputPort<T>();
+  /**
+   * Equal output port.
+   */
+  public final transient DefaultOutputPort<T> equalTo = new DefaultOutputPort<T>();
 
-	/**
-	 * Not Equal output port.
-	 */
-	public final transient DefaultOutputPort<T> notEqualTo = new DefaultOutputPort<T>();
+  /**
+   * Not Equal output port.
+   */
+  public final transient DefaultOutputPort<T> notEqualTo = new DefaultOutputPort<T>();
 
-	/**
-	 * Less Than output port.
-	 */
-	public final transient DefaultOutputPort<T> lessThan = new DefaultOutputPort<T>();
+  /**
+   * Less Than output port.
+   */
+  public final transient DefaultOutputPort<T> lessThan = new DefaultOutputPort<T>();
 
-	/**
-	 * Greater than output port.
-	 */
-	public final transient DefaultOutputPort<T> greaterThan = new DefaultOutputPort<T>();
-	public final transient DefaultOutputPort<T> lessThanOrEqualTo = new DefaultOutputPort<T>();
-	public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new DefaultOutputPort<T>();
+  /**
+   * Greater than output port.
+   */
+  public final transient DefaultOutputPort<T> greaterThan = new DefaultOutputPort<T>();
+  public final transient DefaultOutputPort<T> lessThanOrEqualTo = new DefaultOutputPort<T>();
+  public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new DefaultOutputPort<T>();
 
-	/**
-	 * Set constant for comparison.
-	 * 
-	 * @param constant
-	 *          the constant to set
-	 */
-	public void setConstant(T constant)
-	{
-		this.constant = constant;
-	}
+  /**
+   * Set constant for comparison.
+   *
+   * @param constant
+   *          the constant to set
+   */
+  public void setConstant(T constant)
+  {
+    this.constant = constant;
+  }
 
-	/**
-	 * returns the value of constant
-	 */
-	public T getConstant()
-	{
-		return constant;
-	}
+  /**
+   * returns the value of constant
+   */
+  public T getConstant()
+  {
+    return constant;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Margin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Margin.java b/library/src/main/java/com/datatorrent/lib/math/Margin.java
index 1161ba8..94e15d6 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Margin.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Margin.java
@@ -50,92 +50,92 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
 @OperatorAnnotation(partitionable = false)
 public class Margin<V extends Number> extends BaseNumberValueOperator<V>
 {
-	/**
-	 * Sum of numerator values.
-	 */
-	protected double nval = 0.0;
+  /**
+   * Sum of numerator values.
+   */
+  protected double nval = 0.0;
 
-	/**
-	 * sum of denominator values.
-	 */
-	protected double dval = 0.0;
+  /**
+   * sum of denominator values.
+   */
+  protected double dval = 0.0;
 
-	/**
-	 * Flag to output margin as percentage.
-	 */
-	protected boolean percent = false;
+  /**
+   * Flag to output margin as percentage.
+   */
+  protected boolean percent = false;
 
-	/**
-	 * Numerator input port.
-	 */
-	public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
-	{
-		/**
-		 * Adds to the numerator value
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			nval += tuple.doubleValue();
-		}
-	};
+  /**
+   * Numerator input port.
+   */
+  public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the numerator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      nval += tuple.doubleValue();
+    }
+  };
 
-	/**
-	 * Denominator input port.
-	 */
-	public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
-	{
-		/**
-		 * Adds to the denominator value
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			dval += tuple.doubleValue();
-		}
-	};
+  /**
+   * Denominator input port.
+   */
+  public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the denominator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      dval += tuple.doubleValue();
+    }
+  };
 
-	/**
-	 * Output margin port.
-	 */
-	public final transient DefaultOutputPort<V> margin = new DefaultOutputPort<V>();
+  /**
+   * Output margin port.
+   */
+  public final transient DefaultOutputPort<V> margin = new DefaultOutputPort<V>();
 
-	/**
-	 * getter function for percent
-	 * 
-	 * @return percent
-	 */
-	public boolean getPercent()
-	{
-		return percent;
-	}
+  /**
+   * getter function for percent
+   *
+   * @return percent
+   */
+  public boolean getPercent()
+  {
+    return percent;
+  }
 
-	/**
-	 * setter function for percent
-	 * 
-	 * @param val
-	 *          sets percent
-	 */
-	public void setPercent(boolean val)
-	{
-		percent = val;
-	}
+  /**
+   * setter function for percent
+   *
+   * @param val
+   *          sets percent
+   */
+  public void setPercent(boolean val)
+  {
+    percent = val;
+  }
 
-	/**
-	 * Generates tuple emits it as long as denomitor is not 0 Clears internal data
-	 */
-	@Override
-	public void endWindow()
-	{
-		if (dval == 0) {
-			return;
-		}
-		double val = 1 - (nval / dval);
-		if (percent) {
-			val = val * 100;
-		}
-		margin.emit(getValue(val));
-		nval = 0.0;
-		dval = 0.0;
-	}
+  /**
+   * Generates tuple emits it as long as denomitor is not 0 Clears internal data
+   */
+  @Override
+  public void endWindow()
+  {
+    if (dval == 0) {
+      return;
+    }
+    double val = 1 - (nval / dval);
+    if (percent) {
+      val = val * 100;
+    }
+    margin.emit(getValue(val));
+    nval = 0.0;
+    dval = 0.0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
index 29d35bc..e3af508 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
@@ -23,12 +23,11 @@ import java.util.Map;
 
 import org.apache.commons.lang.mutable.MutableDouble;
 
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  *
@@ -52,135 +51,134 @@ import com.datatorrent.api.StreamCodec;
  * @tags sum, division, numeric, key value
  * @since 0.3.3
  */
-public class MarginKeyVal<K, V extends Number> extends
-		BaseNumberKeyValueOperator<K, V>
+public class MarginKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
 {
         /**
-	 * Numerator input port that takes a key value pair.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new DefaultInputPort<KeyValPair<K, V>>()
-	{
-		/**
-		 * Adds tuple to the numerator hash.
-		 */
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			addTuple(tuple, numerators);
-		}
-
-		/**
-		 * Set StreamCodec used for partitioning.
-		 */
-		@Override
-		public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-		{
-			return getKeyValPairStreamCodec();
-		}
-	};
+   * Numerator input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Adds tuple to the numerator hash.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      addTuple(tuple, numerators);
+    }
+
+    /**
+     * Set StreamCodec used for partitioning.
+     */
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
 
         /**
-	 * Denominator input port that takes a key value pair.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new DefaultInputPort<KeyValPair<K, V>>()
-	{
-		/**
-		 * Adds tuple to the denominator hash.
-		 */
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			addTuple(tuple, denominators);
-		}
-
-		/**
-		 * Set StreamCodec used for partitioning.
-		 */
-		@Override
-		public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-		{
-			return getKeyValPairStreamCodec();
-		}
-	};
-
-	/**
-	 * Adds the value for each key.
-	 *
-	 * @param tuple
-	 * @param map
-	 */
-	public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map)
-	{
-		K key = tuple.getKey();
-		if (!doprocessKey(key) || (tuple.getValue() == null)) {
-			return;
-		}
-		MutableDouble val = map.get(key);
-		if (val == null) {
-			val = new MutableDouble(0.0);
-			map.put(cloneKey(key), val);
-		}
-		val.add(tuple.getValue().doubleValue());
-	}
+   * Denominator input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Adds tuple to the denominator hash.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      addTuple(tuple, denominators);
+    }
+
+    /**
+     * Set StreamCodec used for partitioning.
+     */
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
+
+  /**
+   * Adds the value for each key.
+   *
+   * @param tuple
+   * @param map
+   */
+  public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map)
+  {
+    K key = tuple.getKey();
+    if (!doprocessKey(key) || (tuple.getValue() == null)) {
+      return;
+    }
+    MutableDouble val = map.get(key);
+    if (val == null) {
+      val = new MutableDouble(0.0);
+      map.put(cloneKey(key), val);
+    }
+    val.add(tuple.getValue().doubleValue());
+  }
 
         /**
-	 * Output margin port that emits Key Value pairs.
-	 */
-	public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new DefaultOutputPort<KeyValPair<K, V>>();
-
-	protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
-	protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
-	protected boolean percent = false;
-
-	/**
-	 * getter function for percent
-	 *
-	 * @return percent
-	 */
-	public boolean getPercent()
-	{
-		return percent;
-	}
-
-	/**
-	 * setter function for percent
-	 *
-	 * @param val
-	 *          sets percent
-	 */
-	public void setPercent(boolean val)
-	{
-		percent = val;
-	}
-
-	/**
-	 * Generates tuples for each key and emits them. Only keys that are in the
-	 * denominator are iterated on If the key is only in the numerator, it gets
-	 * ignored (cannot do divide by 0) Clears internal data
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public void endWindow()
-	{
-		Double val;
-		for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
-			K key = e.getKey();
-			MutableDouble nval = numerators.get(key);
-			if (nval == null) {
-				nval = new MutableDouble(0.0);
-			} else {
-				numerators.remove(key); // so that all left over keys can be reported
-			}
-			if (percent) {
-				val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
-			} else {
-				val = 1 - nval.doubleValue() / e.getValue().doubleValue();
-			}
-
-			margin.emit(new KeyValPair(key, getValue(val.doubleValue())));
-		}
-
-		numerators.clear();
-		denominators.clear();
-	}
+   * Output margin port that emits Key Value pairs.
+   */
+  public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new DefaultOutputPort<KeyValPair<K, V>>();
+
+  protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
+  protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
+  protected boolean percent = false;
+
+  /**
+   * getter function for percent
+   *
+   * @return percent
+   */
+  public boolean getPercent()
+  {
+    return percent;
+  }
+
+  /**
+   * setter function for percent
+   *
+   * @param val
+   *          sets percent
+   */
+  public void setPercent(boolean val)
+  {
+    percent = val;
+  }
+
+  /**
+   * Generates tuples for each key and emits them. Only keys that are in the
+   * denominator are iterated on If the key is only in the numerator, it gets
+   * ignored (cannot do divide by 0) Clears internal data
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Override
+  public void endWindow()
+  {
+    Double val;
+    for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
+      K key = e.getKey();
+      MutableDouble nval = numerators.get(key);
+      if (nval == null) {
+        nval = new MutableDouble(0.0);
+      } else {
+        numerators.remove(key); // so that all left over keys can be reported
+      }
+      if (percent) {
+        val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
+      } else {
+        val = 1 - nval.doubleValue() / e.getValue().doubleValue();
+      }
+
+      margin.emit(new KeyValPair(key, getValue(val.doubleValue())));
+    }
+
+    numerators.clear();
+    denominators.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
index 2259d85..7ef1f81 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
@@ -18,13 +18,15 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
 import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
 
 
 /**
@@ -144,18 +146,16 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K
   {
     HashMap<K, V> tuples = new HashMap<K, V>();
     Double val;
-    for (Map.Entry<K, MutableDouble> e: denominators.entrySet()) {
+    for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
       MutableDouble nval = numerators.get(e.getKey());
       if (nval == null) {
         nval = new MutableDouble(0.0);
-      }
-      else {
+      } else {
         numerators.remove(e.getKey()); // so that all left over keys can be reported
       }
       if (percent) {
         val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
-      }
-      else {
+      } else {
         val = 1 - nval.doubleValue() / e.getValue().doubleValue();
       }
       tuples.put(e.getKey(), getValue(val.doubleValue()));

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Max.java b/library/src/main/java/com/datatorrent/lib/math/Max.java
index e4171f6..8ec8b2f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Max.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Max.java
@@ -64,8 +64,7 @@ public class Max<V extends Number> extends BaseNumberValueOperator<V> implements
     if (!flag) {
       high = tuple;
       flag = true;
-    }
-    else if (high.doubleValue() < tuple.doubleValue()) {
+    } else if (high.doubleValue() < tuple.doubleValue()) {
       high = tuple;
     }
   }
@@ -74,7 +73,7 @@ public class Max<V extends Number> extends BaseNumberValueOperator<V> implements
    * Max value output port.
    */
   public final transient DefaultOutputPort<V> max = new DefaultOutputPort<V>()
-   {
+  {
     @Override
     public Unifier<V> getUnifier()
     {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
index 58a947f..95a0a08 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
@@ -21,12 +21,11 @@ package com.datatorrent.lib.math;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  *
@@ -68,8 +67,7 @@ public class MaxKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
       if (val == null) {
         val = tval;
         highs.put(cloneKey(key), val);
-      }
-      else if (val.doubleValue() < tval.doubleValue()) {
+      } else if (val.doubleValue() < tval.doubleValue()) {
         highs.put(key, tval);
       }
     }
@@ -97,7 +95,7 @@ public class MaxKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
    * Clears internal data. Node only works in windowed mode.
    */
   @SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
+  @Override
   public void endWindow()
   {
     if (!highs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Min.java b/library/src/main/java/com/datatorrent/lib/math/Min.java
index 244d990..4b3fa23 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Min.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Min.java
@@ -41,17 +41,17 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
  */
 public class Min<V extends Number> extends BaseNumberValueOperator<V> implements Unifier<V>
 {
-	/**
-	 * Computed low value.
-	 */
+  /**
+   * Computed low value.
+   */
   protected V low;
   
   // transient field
   protected boolean flag = false;
   
-	 /**
-          * Input port that takes a number and compares to min and stores the new min.
-          */
+  /**
+   * Input port that takes a number and compares to min and stores the new min.
+   */
   public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
   {
     /**
@@ -73,8 +73,7 @@ public class Min<V extends Number> extends BaseNumberValueOperator<V> implements
     if (!flag) {
       low = tuple;
       flag = true;
-    }
-    else if (low.doubleValue() > tuple.doubleValue()) {
+    } else if (low.doubleValue() > tuple.doubleValue()) {
       low = tuple;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
index 5ea710b..2468239 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
@@ -21,12 +21,11 @@ package com.datatorrent.lib.math;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  *
@@ -48,9 +47,9 @@ import com.datatorrent.api.StreamCodec;
  */
 public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
 {
-	/**
-	 * Input port which takes a key vaue pair and updates the value for each key if there is a new min.
-	 */
+  /**
+   * Input port which takes a key vaue pair and updates the value for each key if there is a new min.
+   */
   public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
   {
     /**
@@ -67,8 +66,7 @@ public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
       V val = mins.get(key);
       if (val == null) {
         mins.put(cloneKey(key), tval);
-      }
-      else if (val.doubleValue() > tval.doubleValue()) {
+      } else if (val.doubleValue() > tval.doubleValue()) {
         mins.put(key, tval);
       }
     }
@@ -94,7 +92,7 @@ public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
    * Clears internal data. Node only works in windowed mode.
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
-	@Override
+  @Override
   public void endWindow()
   {
     if (!mins.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
index c1c70d3..dd56f7f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Multiplies input tuple (Number) by the value of property "multiplier" and emits the result on respective ports.
@@ -51,9 +51,9 @@ import com.datatorrent.api.annotation.Stateless;
 @Stateless
 public class MultiplyByConstant extends BaseOperator
 {
-	/**
-	 * Input number port.
-	 */
+  /**
+   * Input number port.
+   */
   public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
   {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Quotient.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Quotient.java b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
index d55f205..ed08e86 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Quotient.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
@@ -47,63 +47,63 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
 @OperatorAnnotation(partitionable = false)
 public class Quotient<V extends Number> extends BaseNumberValueOperator<V>
 {
-	protected double nval = 0.0;
-	protected double dval = 0.0;
-	int mult_by = 1;
+  protected double nval = 0.0;
+  protected double dval = 0.0;
+  int mult_by = 1;
 
-	/**
-	 * Numerator values input port.
-	 */
-	public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
-	{
-		/**
-		 * Adds to the numerator value
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			nval += tuple.doubleValue();
-		}
-	};
+  /**
+   * Numerator values input port.
+   */
+  public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the numerator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      nval += tuple.doubleValue();
+    }
+  };
 
-	/**
-	 * Denominator values input port.
-	 */
-	public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
-	{
-		/**
-		 * Adds to the denominator value
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			dval += tuple.doubleValue();
-		}
-	};
+  /**
+   * Denominator values input port.
+   */
+  public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the denominator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      dval += tuple.doubleValue();
+    }
+  };
 
-	/**
-	 * Quotient output port.
-	 */
-	public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>();
+  /**
+   * Quotient output port.
+   */
+  public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>();
 
-	public void setMult_by(int i)
-	{
-		mult_by = i;
-	}
+  public void setMult_by(int i)
+  {
+    mult_by = i;
+  }
 
-	/**
-	 * Generates tuple emits it as long as denominator is not 0. Clears internal
-	 * data
-	 */
-	@Override
-	public void endWindow()
-	{
-		if (dval == 0) {
-			return;
-		}
-		double val = (nval / dval) * mult_by;
-		quotient.emit(getValue(val));
-		nval = 0.0;
-		dval = 0.0;
-	}
+  /**
+   * Generates tuple emits it as long as denominator is not 0. Clears internal
+   * data
+   */
+  @Override
+  public void endWindow()
+  {
+    if (dval == 0) {
+      return;
+    }
+    double val = (nval / dval) * mult_by;
+    quotient.emit(getValue(val));
+    nval = 0.0;
+    dval = 0.0;
+  }
 }