You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:04 UTC
[14/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed
checkstyle violations of malhar library module
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
index 7c9c350..3cace73 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
@@ -27,9 +27,10 @@ import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableDouble;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
@@ -52,9 +53,10 @@ public class MultiWindowDimensionAggregation implements Operator
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(MultiWindowDimensionAggregation.class);
- public enum AggregateOperation {
+ public enum AggregateOperation
+ {
SUM, AVERAGE
- };
+ }
private int windowSize = 2;
private int currentWindow = 0;
@@ -76,7 +78,8 @@ public class MultiWindowDimensionAggregation implements Operator
/**
* This is the input port which receives multi dimensional data.
*/
- public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>() {
+ public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>()
+ {
@Override
public void process(Map<String, Map<String, Number>> tuple)
{
@@ -169,12 +172,15 @@ public class MultiWindowDimensionAggregation implements Operator
@Override
public void setup(OperatorContext arg0)
{
- if (arg0 != null)
+ if (arg0 != null) {
applicationWindowSize = arg0.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
- if (cacheOject == null)
+ }
+ if (cacheOject == null) {
cacheOject = new HashMap<Integer, Map<String, Map<String, Number>>>(windowSize);
- if (outputMap == null)
+ }
+ if (outputMap == null) {
outputMap = new HashMap<String, Map<String, KeyValPair<MutableDouble, Integer>>>();
+ }
setUpPatternList();
}
@@ -238,8 +244,9 @@ public class MultiWindowDimensionAggregation implements Operator
}
}
currentWindowMap.clear();
- if (patternList == null || patternList.isEmpty())
+ if (patternList == null || patternList.isEmpty()) {
setUpPatternList();
+ }
}
@@ -255,12 +262,13 @@ public class MultiWindowDimensionAggregation implements Operator
outputData.put(e.getKey(), new DimensionObject<String>(keyVal.getKey(), dimensionValObj.getKey()));
} else if (operationType == AggregateOperation.AVERAGE) {
if (keyVal.getValue() != 0) {
- double totalCount = ((double) (totalWindowsOccupied * applicationWindowSize)) / 1000;
+ double totalCount = ((double)(totalWindowsOccupied * applicationWindowSize)) / 1000;
outputData.put(e.getKey(), new DimensionObject<String>(new MutableDouble(keyVal.getKey().doubleValue() / totalCount), dimensionValObj.getKey()));
}
}
- if (!outputData.isEmpty())
+ if (!outputData.isEmpty()) {
output.emit(outputData);
+ }
}
}
currentWindow = (currentWindow + 1) % windowSize;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
index d64c634..e469b0c 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
@@ -20,16 +20,19 @@ package com.datatorrent.lib.logs;
import java.util.HashMap;
import java.util.Map;
-import com.google.code.regexp.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.code.regexp.Matcher;
-import com.datatorrent.common.util.BaseOperator;
+import com.google.code.regexp.Pattern;
+
+import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.common.util.BaseOperator;
/**
* This operator parses unstructured log data into named fields.
@@ -77,7 +80,7 @@ import org.slf4j.LoggerFactory;
* @since 1.0.5
*/
@Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
public class RegexMatchMapOperator extends BaseOperator
{
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
index 7f903fc..5f09a4b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
@@ -18,10 +18,10 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.api.DefaultInputPort;
-
import java.util.Collection;
+import com.datatorrent.api.DefaultInputPort;
+
/**
* Aggregates input tuples that are collections of longs and double and emits result on four ports.
* <p>
@@ -49,54 +49,52 @@ import java.util.Collection;
* @since 0.3.3
*/
public abstract class AbstractAggregateCalc<T extends Number> extends
- AbstractOutput
+ AbstractOutput
{
- /**
- * Input port, accepts collection of values of type 'T'.
- */
- public final transient DefaultInputPort<Collection<T>> input = new DefaultInputPort<Collection<T>>()
- {
- /**
- * Aggregate calculation result is only emitted on output port if it is connected.
- */
- @Override
- public void process(Collection<T> collection)
- {
- Double dResult = null;
- if (doubleResult.isConnected()) {
- doubleResult.emit(dResult = aggregateDoubles(collection));
- }
+ /**
+ * Input port, accepts collection of values of type 'T'.
+ */
+ public final transient DefaultInputPort<Collection<T>> input = new DefaultInputPort<Collection<T>>()
+ {
+ /**
+ * Aggregate calculation result is only emitted on output port if it is connected.
+ */
+ @Override
+ public void process(Collection<T> collection)
+ {
+ Double dResult = null;
+ if (doubleResult.isConnected()) {
+ doubleResult.emit(dResult = aggregateDoubles(collection));
+ }
- if (floatResult.isConnected()) {
- floatResult
- .emit(dResult == null ? (float) (aggregateDoubles(collection))
- : dResult.floatValue());
- }
+ if (floatResult.isConnected()) {
+ floatResult.emit(dResult == null ? (float)(aggregateDoubles(collection)) : dResult.floatValue());
+ }
- Long lResult = null;
- if (longResult.isConnected()) {
- longResult.emit(lResult = aggregateLongs(collection));
- }
+ Long lResult = null;
+ if (longResult.isConnected()) {
+ longResult.emit(lResult = aggregateLongs(collection));
+ }
- if (integerResult.isConnected()) {
- integerResult.emit(lResult == null ? (int) aggregateLongs(collection)
- : lResult.intValue());
- }
- }
+ if (integerResult.isConnected()) {
+ integerResult.emit(lResult == null ? (int)aggregateLongs(collection)
+ : lResult.intValue());
+ }
+ }
- };
+ };
- /**
- * Abstract function to be implemented by sub class, custom calculation on input aggregate.
- * @param collection Aggregate of values
- * @return calculated value.
- */
- public abstract long aggregateLongs(Collection<T> collection);
+ /**
+ * Abstract function to be implemented by sub class, custom calculation on input aggregate.
+ * @param collection Aggregate of values
+ * @return calculated value.
+ */
+ public abstract long aggregateLongs(Collection<T> collection);
- /**
- * Abstract function to be implemented by sub class, custom calculation on input aggregate.
- * @param collection Aggregate of values
- * @return calculated value.
- */
- public abstract double aggregateDoubles(Collection<T> collection);
+ /**
+ * Abstract function to be implemented by sub class, custom calculation on input aggregate.
+ * @param collection Aggregate of values
+ * @return calculated value.
+ */
+ public abstract double aggregateDoubles(Collection<T> collection);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
index bb387b4..9600021 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
@@ -18,9 +18,9 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* Abstract base operator defining optional double/float/long/integer output port.
@@ -34,27 +34,27 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
*/
public abstract class AbstractOutput extends BaseOperator
{
- /**
- * Double type output.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Double> doubleResult = new DefaultOutputPort<Double>();
+ /**
+ * Double type output.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Double> doubleResult = new DefaultOutputPort<Double>();
- /**
- * Float type output.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Float> floatResult = new DefaultOutputPort<Float>();
+ /**
+ * Float type output.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Float> floatResult = new DefaultOutputPort<Float>();
- /**
- * Long type output.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Long> longResult = new DefaultOutputPort<Long>();
+ /**
+ * Long type output.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Long> longResult = new DefaultOutputPort<Long>();
- /**
- * Integer type output.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Integer> integerResult = new DefaultOutputPort<Integer>();
+ /**
+ * Integer type output.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Integer> integerResult = new DefaultOutputPort<Integer>();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
index a6b7ab2..10b6c15 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
@@ -18,17 +18,23 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.lib.xml.AbstractXmlDOMOperator;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import javax.validation.constraints.NotNull;
-import javax.xml.xpath.*;
-import java.util.ArrayList;
-import java.util.List;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.xml.AbstractXmlDOMOperator;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* An operator that performs a cartesian product between different elements in a xml document.
@@ -146,7 +152,7 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
try {
List<String> result = new ArrayList<String>();
for (CartesianProduct cartesianProduct : cartesianProducts) {
- cartesianProduct.product(document, result);
+ cartesianProduct.product(document, result);
}
processResult(result, tuple);
} catch (XPathExpressionException e) {
@@ -252,8 +258,11 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
int balance = 1;
int i;
for (i = 1; (i < spec.length()) && (balance > 0); ++i) {
- if (spec.charAt(i) == ')') balance--;
- else if (spec.charAt(i) == '(') balance++;
+ if (spec.charAt(i) == ')') {
+ balance--;
+ } else if (spec.charAt(i) == '(') {
+ balance++;
+ }
}
if (i == spec.length()) {
estr = spec.substring(1, spec.length() - 1);
@@ -358,10 +367,10 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
int chldEdDelIdx = productSpec.length() - 1;
int chldSepDelIdx;
if ((productSpec.charAt(chldStDelIdx) == '(') && (productSpec.charAt(chldEdDelIdx) == ')')
- && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) {
+ && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) {
String child1Spec = productSpec.substring(chldStDelIdx + 1, chldSepDelIdx);
String child2Spec = productSpec.substring(chldSepDelIdx + 1, chldEdDelIdx);
- parentElement = (SimplePathElement) pathElement;
+ parentElement = (SimplePathElement)pathElement;
childElement1 = pathElementFactory.getSpecable(child1Spec);
childElement2 = pathElementFactory.getSpecable(child2Spec);
}
@@ -419,7 +428,7 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
private List<Node> getNodes(Document document, String path) throws XPathExpressionException
{
XPathExpression pathExpr = xpath.compile(path);
- NodeList nodeList = (NodeList) pathExpr.evaluate(document, XPathConstants.NODESET);
+ NodeList nodeList = (NodeList)pathExpr.evaluate(document, XPathConstants.NODESET);
List<Node> nodes = new ArrayList<Node>();
for (int i = 0; i < nodeList.getLength(); ++i) {
nodes.add(nodeList.item(i));
@@ -459,8 +468,11 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera
String delim = getDelim();
boolean first = true;
for (Node node : nodes) {
- if (!first) sb.append(delim);
- else first = false;
+ if (!first) {
+ sb.append(delim);
+ } else {
+ first = false;
+ }
sb.append(getValue(node));
}
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
index 03c98d3..91cc9ba 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
@@ -39,7 +39,8 @@ public abstract class AbstractXmlKeyValueCartesianProduct<T> extends AbstractXml
}
@Override
- public boolean isValueNode(Node n) {
+ public boolean isValueNode(Node n)
+ {
return isTextContainerNode(n);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Average.java b/library/src/main/java/com/datatorrent/lib/math/Average.java
index d956e05..4dfdf1f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Average.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Average.java
@@ -44,77 +44,77 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
*/
public class Average<V extends Number> extends BaseNumberValueOperator<V>
{
- /**
- * Input port that takes a number.
- */
- public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
- {
- /**
- * Computes sum and count with each tuple
- */
- @Override
- public void process(V tuple)
- {
- sums += tuple.doubleValue();
- counts++;
- }
- };
+ /**
+ * Input port that takes a number.
+ */
+ public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+ {
+ /**
+ * Computes sum and count with each tuple
+ */
+ @Override
+ public void process(V tuple)
+ {
+ sums += tuple.doubleValue();
+ counts++;
+ }
+ };
- /**
- * Output port that emits average as a number.
- */
- public final transient DefaultOutputPort<V> average = new DefaultOutputPort<V>();
+ /**
+ * Output port that emits average as a number.
+ */
+ public final transient DefaultOutputPort<V> average = new DefaultOutputPort<V>();
- protected double sums = 0;
- protected long counts = 0;
+ protected double sums = 0;
+ protected long counts = 0;
- /**
- * Emit average.
- */
- @Override
- public void endWindow()
- {
- // May want to send out only if count != 0
- if (counts != 0) {
- average.emit(getAverage());
- }
- sums = 0;
- counts = 0;
- }
+ /**
+ * Emit average.
+ */
+ @Override
+ public void endWindow()
+ {
+ // May want to send out only if count != 0
+ if (counts != 0) {
+ average.emit(getAverage());
+ }
+ sums = 0;
+ counts = 0;
+ }
- /**
- * Calculate average based on number type.
- */
- @SuppressWarnings("unchecked")
- public V getAverage()
- {
- if (counts == 0) {
- return null;
- }
- V num = getValue(sums);
- Number val;
- switch (getType()) {
- case DOUBLE:
- val = new Double(num.doubleValue() / counts);
- break;
- case INTEGER:
- int icount = (int) (num.intValue() / counts);
- val = new Integer(icount);
- break;
- case FLOAT:
- val = new Float(num.floatValue() / counts);
- break;
- case LONG:
- val = new Long(num.longValue() / counts);
- break;
- case SHORT:
- short scount = (short) (num.shortValue() / counts);
- val = new Short(scount);
- break;
- default:
- val = new Double(num.doubleValue() / counts);
- break;
- }
- return (V) val;
- }
+ /**
+ * Calculate average based on number type.
+ */
+ @SuppressWarnings("unchecked")
+ public V getAverage()
+ {
+ if (counts == 0) {
+ return null;
+ }
+ V num = getValue(sums);
+ Number val;
+ switch (getType()) {
+ case DOUBLE:
+ val = new Double(num.doubleValue() / counts);
+ break;
+ case INTEGER:
+ int icount = (int)(num.intValue() / counts);
+ val = new Integer(icount);
+ break;
+ case FLOAT:
+ val = new Float(num.floatValue() / counts);
+ break;
+ case LONG:
+ val = new Long(num.longValue() / counts);
+ break;
+ case SHORT:
+ short scount = (short)(num.shortValue() / counts);
+ val = new Short(scount);
+ break;
+ default:
+ val = new Double(num.doubleValue() / counts);
+ break;
+ }
+ return (V)val;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
index e9e7e40..e443780 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.lib.math;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableLong;
+
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableLong;
/**
*
@@ -57,84 +59,87 @@ import org.apache.commons.lang.mutable.MutableLong;
*/
public class AverageKeyVal<K> extends BaseNumberKeyValueOperator<K, Number>
{
- // Aggregate sum of all values seen for a key.
- protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
-
- // Count of number of values seen for key.
- protected HashMap<K, MutableLong> counts = new HashMap<K, MutableLong>();
-
- /**
- * Input port that takes a key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>()
- {
- /**
- * Adds the values for each key, counts the number of occurrences of each
- * key and computes the average.
- */
- @Override
- public void process(KeyValPair<K, ? extends Number> tuple)
- {
- K key = tuple.getKey();
- if (!doprocessKey(key)) {
- return;
- }
- MutableDouble val = sums.get(key);
- if (val == null) {
- val = new MutableDouble(tuple.getValue().doubleValue());
- } else {
- val.add(tuple.getValue().doubleValue());
- }
- sums.put(cloneKey(key), val);
+ // Aggregate sum of all values seen for a key.
+ protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+
+ // Count of number of values seen for key.
+ protected HashMap<K, MutableLong> counts = new HashMap<K, MutableLong>();
+
+ /**
+ * Input port that takes a key value pair.
+ */
+ public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>()
+ {
+ /**
+ * Adds the values for each key, counts the number of occurrences of each
+ * key and computes the average.
+ */
+ @Override
+ public void process(KeyValPair<K, ? extends Number> tuple)
+ {
+ K key = tuple.getKey();
+ if (!doprocessKey(key)) {
+ return;
+ }
+ MutableDouble val = sums.get(key);
+ if (val == null) {
+ val = new MutableDouble(tuple.getValue().doubleValue());
+ } else {
+ val.add(tuple.getValue().doubleValue());
+ }
+ sums.put(cloneKey(key), val);
+
+ MutableLong count = counts.get(key);
+ if (count == null) {
+ count = new MutableLong(0);
+ counts.put(cloneKey(key), count);
+ }
+ count.increment();
+ }
+ };
+
+ /**
+ * Double average output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage =
+ new DefaultOutputPort<KeyValPair<K, Double>>();
- MutableLong count = counts.get(key);
- if (count == null) {
- count = new MutableLong(0);
- counts.put(cloneKey(key), count);
- }
- count.increment();
- }
- };
+ /**
+ * Integer average output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage =
+ new DefaultOutputPort<KeyValPair<K, Integer>>();
- /**
- * Double average output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage = new DefaultOutputPort<KeyValPair<K, Double>>();
-
- /**
- * Integer average output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage = new DefaultOutputPort<KeyValPair<K, Integer>>();
-
- /**
- * Long average output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage = new DefaultOutputPort<KeyValPair<K, Long>>();
+ /**
+ * Long average output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage =
+ new DefaultOutputPort<KeyValPair<K, Long>>();
- /**
- * Emits average for each key in end window. Data is computed during process
- * on input port Clears the internal data before return.
- */
- @Override
- public void endWindow()
- {
- for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
- K key = e.getKey();
- double d = e.getValue().doubleValue();
- if (doubleAverage.isConnected()) {
- doubleAverage.emit(new KeyValPair<K, Double>(key, d / counts.get(key).doubleValue()));
- }
- if (intAverage.isConnected()) {
- intAverage.emit(new KeyValPair<K, Integer>(key, (int) d));
- }
- if (longAverage.isConnected()) {
- longAverage.emit(new KeyValPair<K, Long>(key, (long) d));
- }
- }
- sums.clear();
- counts.clear();
- }
+ /**
+ * Emits average for each key in end window. Data is computed during process
+ * on input port Clears the internal data before return.
+ */
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
+ K key = e.getKey();
+ double d = e.getValue().doubleValue();
+ if (doubleAverage.isConnected()) {
+ doubleAverage.emit(new KeyValPair<K, Double>(key, d / counts.get(key).doubleValue()));
+ }
+ if (intAverage.isConnected()) {
+ intAverage.emit(new KeyValPair<K, Integer>(key, (int)d));
+ }
+ if (longAverage.isConnected()) {
+ longAverage.emit(new KeyValPair<K, Long>(key, (long)d));
+ }
+ }
+ sums.clear();
+ counts.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Change.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Change.java b/library/src/main/java/com/datatorrent/lib/math/Change.java
index 628839f..57bad6b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Change.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Change.java
@@ -61,57 +61,57 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
public class Change<V extends Number> extends BaseNumberValueOperator<V>
{
/**
- * Input data port that takes a number.
- */
- public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
- {
- /**
- * Process each key, compute change or percent, and emit it.
- */
- @Override
- public void process(V tuple)
- {
- if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple?
- double cval = tuple.doubleValue() - baseValue;
- change.emit(getValue(cval));
- percent.emit((cval / baseValue) * 100);
- }
- }
- };
+ * Input data port that takes a number.
+ */
+ public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+ {
+ /**
+ * Process each key, compute change or percent, and emit it.
+ */
+ @Override
+ public void process(V tuple)
+ {
+ if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple?
+ double cval = tuple.doubleValue() - baseValue;
+ change.emit(getValue(cval));
+ percent.emit((cval / baseValue) * 100);
+ }
+ }
+ };
/**
- * Input port that takes a number It stores the value for base comparison.
- */
- public final transient DefaultInputPort<V> base = new DefaultInputPort<V>()
- {
- /**
- * Process each key to store the value. If same key appears again update
- * with latest value.
- */
- @Override
- public void process(V tuple)
- {
- if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error
- // tuple?
- baseValue = tuple.doubleValue();
- }
- }
- };
-
- /**
- * Output port that emits change in value compared to base value.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>();
-
- /**
- * Output port that emits percent change in data value compared to base value.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>();
-
- /**
- * baseValue is a state full field. It is retained across windows.
- */
- private double baseValue = 0;
+ * Input port that takes a number It stores the value for base comparison.
+ */
+ public final transient DefaultInputPort<V> base = new DefaultInputPort<V>()
+ {
+ /**
+ * Process each key to store the value. If same key appears again update
+ * with latest value.
+ */
+ @Override
+ public void process(V tuple)
+ {
+ if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error
+ // tuple?
+ baseValue = tuple.doubleValue();
+ }
+ }
+ };
+
+ /**
+ * Output port that emits change in value compared to base value.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>();
+
+ /**
+ * Output port that emits percent change in data value compared to base value.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>();
+
+ /**
+ * baseValue is a state full field. It is retained across windows.
+ */
+ private double baseValue = 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
index 01a040d..3c48016 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
@@ -53,66 +53,66 @@ import com.datatorrent.lib.util.KeyValPair;
*/
public class ChangeAlert<V extends Number> extends BaseNumberValueOperator<V>
{
- /**
- * Input port that takes in a number.
- */
- public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
- {
- /**
- * Process each key, compute change or percent, and emit it. If we get 0 as
- * tuple next will be skipped.
- */
- @Override
- public void process(V tuple)
- {
- double tval = tuple.doubleValue();
- if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple?
- baseValue = tval;
- return;
- }
- double change = tval - baseValue;
- double percent = (change / baseValue) * 100;
- if (percent < 0.0) {
- percent = 0.0 - percent;
- }
- if (percent > percentThreshold) {
- KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple),
- percent);
- alert.emit(kv);
- }
- baseValue = tval;
- }
- };
+ /**
+ * Input port that takes in a number.
+ */
+ public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+ {
+ /**
+ * Process each key, compute change or percent, and emit it. If we get 0 as
+ * tuple next will be skipped.
+ */
+ @Override
+ public void process(V tuple)
+ {
+ double tval = tuple.doubleValue();
+ if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple?
+ baseValue = tval;
+ return;
+ }
+ double change = tval - baseValue;
+ double percent = (change / baseValue) * 100;
+ if (percent < 0.0) {
+ percent = 0.0 - percent;
+ }
+ if (percent > percentThreshold) {
+ KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple),
+ percent);
+ alert.emit(kv);
+ }
+ baseValue = tval;
+ }
+ };
- /**
- * Output port which emits a key value pair.
- */
- public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>();
+ /**
+ * Output port which emits a key value pair.
+ */
+ public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>();
- /**
- * baseValue is a state full field. It is retained across windows
- */
- private double baseValue = 0;
- @Min(1)
- private double percentThreshold = 0.0;
+ /**
+ * baseValue is a state full field. It is retained across windows
+ */
+ private double baseValue = 0;
+ @Min(1)
+ private double percentThreshold = 0.0;
- /**
- * getter function for threshold value
- *
- * @return threshold value
- */
- @Min(1)
- public double getPercentThreshold()
- {
- return percentThreshold;
- }
+ /**
+ * getter function for threshold value
+ *
+ * @return threshold value
+ */
+ @Min(1)
+ public double getPercentThreshold()
+ {
+ return percentThreshold;
+ }
- /**
- * setter function for threshold value
- */
- public void setPercentThreshold(double d)
- {
- percentThreshold = d;
- }
+ /**
+ * setter function for threshold value
+ */
+ public void setPercentThreshold(double d)
+ {
+ percentThreshold = d;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
index 43e098f..b0d2e77 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
@@ -52,78 +52,78 @@ import com.datatorrent.lib.util.KeyValPair;
* @since 0.3.3
*/
public class ChangeAlertKeyVal<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
+ BaseNumberKeyValueOperator<K, V>
{
- /**
- * Base map is a StateFull field. It is retained across windows
- */
- private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
+ /**
+ * Base map is a StateFull field. It is retained across windows
+ */
+ private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
- /**
- * Input data port that takes a key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * Process each key, compute change or percent, and emit it.
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- K key = tuple.getKey();
- double tval = tuple.getValue().doubleValue();
- MutableDouble val = basemap.get(key);
- if (!doprocessKey(key)) {
- return;
- }
- if (val == null) { // Only process keys that are in the basemap
- val = new MutableDouble(tval);
- basemap.put(cloneKey(key), val);
- return;
- }
- double change = tval - val.doubleValue();
- double percent = (change / val.doubleValue()) * 100;
- if (percent < 0.0) {
- percent = 0.0 - percent;
- }
- if (percent > percentThreshold) {
- KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
- cloneValue(tuple.getValue()), percent);
- KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>(
- cloneKey(key), dmap);
- alert.emit(otuple);
- }
- val.setValue(tval);
- }
- };
+ /**
+ * Input data port that takes a key value pair.
+ */
+ public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+ {
+ /**
+ * Process each key, compute change or percent, and emit it.
+ */
+ @Override
+ public void process(KeyValPair<K, V> tuple)
+ {
+ K key = tuple.getKey();
+ double tval = tuple.getValue().doubleValue();
+ MutableDouble val = basemap.get(key);
+ if (!doprocessKey(key)) {
+ return;
+ }
+ if (val == null) { // Only process keys that are in the basemap
+ val = new MutableDouble(tval);
+ basemap.put(cloneKey(key), val);
+ return;
+ }
+ double change = tval - val.doubleValue();
+ double percent = (change / val.doubleValue()) * 100;
+ if (percent < 0.0) {
+ percent = 0.0 - percent;
+ }
+ if (percent > percentThreshold) {
+ KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
+ cloneValue(tuple.getValue()), percent);
+ KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>(
+ cloneKey(key), dmap);
+ alert.emit(otuple);
+ }
+ val.setValue(tval);
+ }
+ };
- /**
- * Key,Percent Change output port.
- */
- public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
+ /**
+ * Key,Percent Change output port.
+ */
+ public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
- /**
- * Alert thresh hold percentage set by application.
- */
- @Min(1)
- private double percentThreshold = 0.0;
+ /**
+ * Alert thresh hold percentage set by application.
+ */
+ @Min(1)
+ private double percentThreshold = 0.0;
- /**
- * getter function for threshold value
- *
- * @return threshold value
- */
- @Min(1)
- public double getPercentThreshold()
- {
- return percentThreshold;
- }
+ /**
+ * getter function for threshold value
+ *
+ * @return threshold value
+ */
+ @Min(1)
+ public double getPercentThreshold()
+ {
+ return percentThreshold;
+ }
- /**
- * setter function for threshold value
- */
- public void setPercentThreshold(double d)
- {
- percentThreshold = d;
- }
+ /**
+ * setter function for threshold value
+ */
+ public void setPercentThreshold(double d)
+ {
+ percentThreshold = d;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
index ebf16d1..e212a2d 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
@@ -74,7 +74,7 @@ public class ChangeAlertMap<K, V extends Number> extends BaseNumberKeyValueOpera
continue;
}
double change = e.getValue().doubleValue() - val.doubleValue();
- double percent = (change/val.doubleValue())*100;
+ double percent = (change / val.doubleValue()) * 100;
if (percent < 0.0) {
percent = 0.0 - percent;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
index 2e406ec..3f77052 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
@@ -54,8 +54,7 @@ import com.datatorrent.lib.util.KeyValPair;
* @tags change, key value
* @since 0.3.3
*/
-public class ChangeKeyVal<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
+public class ChangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
{
/**
* basemap is a stateful field. It is retained across windows
@@ -81,8 +80,7 @@ public class ChangeKeyVal<K, V extends Number> extends
if (bval != null) { // Only process keys that are in the basemap
double cval = tuple.getValue().doubleValue() - bval.doubleValue();
change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval)));
- percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval
- .doubleValue()) * 100));
+ percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100));
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
index 0573e3e..66bd7da 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
@@ -18,13 +18,14 @@
*/
package com.datatorrent.lib.math;
+import java.util.HashMap;
+import java.util.Map;
+
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.algo.MatchMap;
import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
-import java.util.Map;
/**
* Operator compares based on the property "key", "value", and "compare".
@@ -86,13 +87,13 @@ public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V>
/**
* Output port that emits a hashmap of matched tuples after comparison.
*/
- @OutputPortFieldAnnotation(optional=true)
+ @OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
/**
* Output port that emits a hashmap of non matching tuples after comparison.
*/
- @OutputPortFieldAnnotation(optional=true)
+ @OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
{
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
index 540f756..3636207 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
@@ -18,10 +18,11 @@
*/
package com.datatorrent.lib.math;
+import java.util.HashMap;
+
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.algo.MatchMap;
-import java.util.HashMap;
/**
* This operator compares tuples subclassed from Number based on the property "key", "value", and "cmp", and matching tuples are emitted.
@@ -78,8 +79,8 @@ import java.util.HashMap;
@Stateless
public class CompareMap<K, V extends Number> extends MatchMap<K,V>
{
- /**
- * Output port that emits a hashmap of matching number tuples after comparison.
- */
- public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
+ /**
+ * Output port that emits a hashmap of matching number tuples after comparison.
+ */
+ public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
index 64c5029..d593020 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
@@ -50,65 +50,65 @@ import com.datatorrent.lib.util.UnifierCountOccurKey;
public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V>
{
- /**
- * Key occurrence count map.
- */
- protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
+ /**
+ * Key occurrence count map.
+ */
+ protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
- /**
- * Input data port that takes key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * For each tuple (a key value pair): Adds the values for each key, Counts
- * the number of occurrence of each key
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- K key = tuple.getKey();
- MutableInt count = counts.get(key);
- if (count == null) {
- count = new MutableInt(0);
- counts.put(cloneKey(key), count);
- }
- count.increment();
- }
+ /**
+ * Input data port that takes key value pair.
+ */
+ public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+ {
+ /**
+ * For each tuple (a key value pair): Adds the values for each key, Counts
+ * the number of occurrence of each key
+ */
+ @Override
+ public void process(KeyValPair<K, V> tuple)
+ {
+ K key = tuple.getKey();
+ MutableInt count = counts.get(key);
+ if (count == null) {
+ count = new MutableInt(0);
+ counts.put(cloneKey(key), count);
+ }
+ count.increment();
+ }
- @Override
- public StreamCodec<KeyValPair<K, V>> getStreamCodec()
- {
- return getKeyValPairStreamCodec();
- }
- };
+ @Override
+ public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+ {
+ return getKeyValPairStreamCodec();
+ }
+ };
- /**
- * Key, occurrence value pair output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
- {
- @Override
- public UnifierCountOccurKey<K> getUnifier()
- {
- return new UnifierCountOccurKey<K>();
- }
- };
+ /**
+ * Key, occurrence value pair output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
+ {
+ @Override
+ public UnifierCountOccurKey<K> getUnifier()
+ {
+ return new UnifierCountOccurKey<K>();
+ }
+ };
- /**
- * Emits on all ports that are connected. Data is computed during process on
- * input port and endWindow just emits it for each key. Clears the internal
- * data if resetAtEndWindow is true.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public void endWindow()
- {
- for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
- count.emit(new KeyValPair(e.getKey(),
- new Integer(e.getValue().intValue())));
- }
- counts.clear();
- }
+ /**
+ * Emits on all ports that are connected. Data is computed during process on
+ * input port and endWindow just emits it for each key. Clears the internal
+ * data if resetAtEndWindow is true.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
+ count.emit(new KeyValPair(e.getKey(),
+ new Integer(e.getValue().intValue())));
+ }
+ counts.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Division.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Division.java b/library/src/main/java/com/datatorrent/lib/math/Division.java
index 5bbb9a9..d05af18 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Division.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Division.java
@@ -20,10 +20,10 @@ package com.datatorrent.lib.math;
import java.util.ArrayList;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* This operator does division metric on consecutive tuples on ports.
@@ -54,9 +54,9 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
*/
public class Division extends BaseOperator
{
- /**
- * Array to store numerator inputs during window.
- */
+ /**
+ * Array to store numerator inputs during window.
+ */
private ArrayList<Number> numer = new ArrayList<Number>();
/**
@@ -83,7 +83,7 @@ public class Division extends BaseOperator
if (loc > numer.size()) {
loc = numer.size();
}
- emit(numer.get(loc-1), denom.get(loc-1));
+ emit(numer.get(loc - 1), denom.get(loc - 1));
index++;
}
}
@@ -107,7 +107,7 @@ public class Division extends BaseOperator
if (loc > numer.size()) {
loc = numer.size();
}
- emit(numer.get(loc-1), denom.get(loc-1));
+ emit(numer.get(loc - 1), denom.get(loc - 1));
index++;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
index 84b10b8..ddef880 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
@@ -68,35 +68,35 @@ public class ExceptMap<K, V extends Number> extends MatchMap<K, V>
/**
* Output port that emits non matching number tuples.
*/
- public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
- {
- @Override
- public Unifier<HashMap<K, V>> getUnifier()
- {
- return new UnifierHashMap<K, V>();
- }
- };
+ public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
+ {
+ @Override
+ public Unifier<HashMap<K, V>> getUnifier()
+ {
+ return new UnifierHashMap<K, V>();
+ }
+ };
- /**
- * Does nothing. Overrides base as call super.tupleMatched() would emit the
- * tuple
- *
- * @param tuple
- */
- @Override
- public void tupleMatched(Map<K, V> tuple)
- {
- }
+ /**
+ * Does nothing. Overrides base as call super.tupleMatched() would emit the
+ * tuple
+ *
+ * @param tuple
+ */
+ @Override
+ public void tupleMatched(Map<K, V> tuple)
+ {
+ }
- /**
- * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override
- * in case objects are mutable
- *
- * @param tuple
- */
- @Override
- public void tupleNotMatched(Map<K, V> tuple)
- {
- except.emit(cloneTuple(tuple));
- }
+ /**
+ * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override
+ * in case objects are mutable
+ *
+ * @param tuple
+ */
+ @Override
+ public void tupleNotMatched(Map<K, V> tuple)
+ {
+ except.emit(cloneTuple(tuple));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
index 0b7e036..41ce5a0 100644
--- a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
+++ b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
@@ -18,10 +18,10 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.Pair;
/**
@@ -29,7 +29,7 @@ import com.datatorrent.common.util.Pair;
* <p>
* If the first value is equal to second value, then the pair is emitted on equalTo, greaterThanEqualTo, and lessThanEqualTo ports.
* If the first value is less than second value, then the pair is emitted on notEqualTo, lessThan and lessThanEqualTo ports.
- * If the first value is greater than second value, then the pair is emitted on notEqualTo, greaterThan and greaterThanEqualTo ports.
+ * If the first value is greater than second value, then the pair is emitted on notEqualTo, greaterThan and greaterThanEqualTo ports.
* This is a pass through operator.
* <br>
* StateFull : No, output is computed during current window. <br>
@@ -51,61 +51,61 @@ import com.datatorrent.common.util.Pair;
*/
@Stateless
public abstract class LogicalCompare<T extends Comparable<? super T>> extends
- BaseOperator
+ BaseOperator
{
- /**
- * Input port that takes a key, value pair for comparison.
- */
- public final transient DefaultInputPort<Pair<T, T>> input = new DefaultInputPort<Pair<T, T>>()
- {
- @Override
- public void process(Pair<T, T> tuple)
- {
- int i = tuple.first.compareTo(tuple.second);
- if (i > 0) {
- greaterThan.emit(tuple);
- greaterThanOrEqualTo.emit(tuple);
- notEqualTo.emit(tuple);
- } else if (i < 0) {
- lessThan.emit(tuple);
- lessThanOrEqualTo.emit(tuple);
- notEqualTo.emit(tuple);
- } else {
- equalTo.emit(tuple);
- lessThanOrEqualTo.emit(tuple);
- greaterThanOrEqualTo.emit(tuple);
- }
- }
+ /**
+ * Input port that takes a key, value pair for comparison.
+ */
+ public final transient DefaultInputPort<Pair<T, T>> input = new DefaultInputPort<Pair<T, T>>()
+ {
+ @Override
+ public void process(Pair<T, T> tuple)
+ {
+ int i = tuple.first.compareTo(tuple.second);
+ if (i > 0) {
+ greaterThan.emit(tuple);
+ greaterThanOrEqualTo.emit(tuple);
+ notEqualTo.emit(tuple);
+ } else if (i < 0) {
+ lessThan.emit(tuple);
+ lessThanOrEqualTo.emit(tuple);
+ notEqualTo.emit(tuple);
+ } else {
+ equalTo.emit(tuple);
+ lessThanOrEqualTo.emit(tuple);
+ greaterThanOrEqualTo.emit(tuple);
+ }
+ }
- };
+ };
- /**
- * Equal output port.
- */
- public final transient DefaultOutputPort<Pair<T, T>> equalTo = new DefaultOutputPort<Pair<T, T>>();
+ /**
+ * Equal output port.
+ */
+ public final transient DefaultOutputPort<Pair<T, T>> equalTo = new DefaultOutputPort<Pair<T, T>>();
- /**
- * Not Equal output port.
- */
- public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new DefaultOutputPort<Pair<T, T>>();
+ /**
+ * Not Equal output port.
+ */
+ public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new DefaultOutputPort<Pair<T, T>>();
- /**
- * Less than output port.
- */
- public final transient DefaultOutputPort<Pair<T, T>> lessThan = new DefaultOutputPort<Pair<T, T>>();
+ /**
+ * Less than output port.
+ */
+ public final transient DefaultOutputPort<Pair<T, T>> lessThan = new DefaultOutputPort<Pair<T, T>>();
- /**
- * Greater than output port.
- */
- public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new DefaultOutputPort<Pair<T, T>>();
+ /**
+ * Greater than output port.
+ */
+ public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new DefaultOutputPort<Pair<T, T>>();
- /**
- * Less than equal to output port.
- */
- public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
+ /**
+ * Less than equal to output port.
+ */
+ public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
- /**
- * Greater than equal to output port.
- */
- public final transient DefaultOutputPort<Pair<T, T>> greaterThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
+ /**
+ * Greater than equal to output port.
+ */
+ public final transient DefaultOutputPort<Pair<T, T>> greaterThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
index 5e98eae..659a287 100644
--- a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
+++ b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
@@ -18,10 +18,10 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
/**
* This operator does a logical comparison of a constant with a tuple.
@@ -54,78 +54,78 @@ import com.datatorrent.api.annotation.Stateless;
*/
@Stateless
public class LogicalCompareToConstant<T extends Comparable<? super T>> extends
- BaseOperator
+ BaseOperator
{
- /**
- * Compare constant, set by application.
- */
- private T constant;
+ /**
+ * Compare constant, set by application.
+ */
+ private T constant;
- /**
- * Input port that takes a comparable to compare it with a constant.
- */
- public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
- {
- @Override
- public void process(T tuple)
- {
- int i = constant.compareTo(tuple);
- if (i > 0) {
- greaterThan.emit(tuple);
- greaterThanOrEqualTo.emit(tuple);
- notEqualTo.emit(tuple);
- } else if (i < 0) {
- lessThan.emit(tuple);
- lessThanOrEqualTo.emit(tuple);
- notEqualTo.emit(tuple);
- } else {
- equalTo.emit(tuple);
- lessThanOrEqualTo.emit(tuple);
- greaterThanOrEqualTo.emit(tuple);
- }
- }
+ /**
+ * Input port that takes a comparable to compare it with a constant.
+ */
+ public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ int i = constant.compareTo(tuple);
+ if (i > 0) {
+ greaterThan.emit(tuple);
+ greaterThanOrEqualTo.emit(tuple);
+ notEqualTo.emit(tuple);
+ } else if (i < 0) {
+ lessThan.emit(tuple);
+ lessThanOrEqualTo.emit(tuple);
+ notEqualTo.emit(tuple);
+ } else {
+ equalTo.emit(tuple);
+ lessThanOrEqualTo.emit(tuple);
+ greaterThanOrEqualTo.emit(tuple);
+ }
+ }
- };
+ };
- /**
- * Equal output port.
- */
- public final transient DefaultOutputPort<T> equalTo = new DefaultOutputPort<T>();
+ /**
+ * Equal output port.
+ */
+ public final transient DefaultOutputPort<T> equalTo = new DefaultOutputPort<T>();
- /**
- * Not Equal output port.
- */
- public final transient DefaultOutputPort<T> notEqualTo = new DefaultOutputPort<T>();
+ /**
+ * Not Equal output port.
+ */
+ public final transient DefaultOutputPort<T> notEqualTo = new DefaultOutputPort<T>();
- /**
- * Less Than output port.
- */
- public final transient DefaultOutputPort<T> lessThan = new DefaultOutputPort<T>();
+ /**
+ * Less Than output port.
+ */
+ public final transient DefaultOutputPort<T> lessThan = new DefaultOutputPort<T>();
- /**
- * Greater than output port.
- */
- public final transient DefaultOutputPort<T> greaterThan = new DefaultOutputPort<T>();
- public final transient DefaultOutputPort<T> lessThanOrEqualTo = new DefaultOutputPort<T>();
- public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new DefaultOutputPort<T>();
+ /**
+ * Greater than output port.
+ */
+ public final transient DefaultOutputPort<T> greaterThan = new DefaultOutputPort<T>();
+ public final transient DefaultOutputPort<T> lessThanOrEqualTo = new DefaultOutputPort<T>();
+ public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new DefaultOutputPort<T>();
- /**
- * Set constant for comparison.
- *
- * @param constant
- * the constant to set
- */
- public void setConstant(T constant)
- {
- this.constant = constant;
- }
+ /**
+ * Set constant for comparison.
+ *
+ * @param constant
+ * the constant to set
+ */
+ public void setConstant(T constant)
+ {
+ this.constant = constant;
+ }
- /**
- * returns the value of constant
- */
- public T getConstant()
- {
- return constant;
- }
+ /**
+ * returns the value of constant
+ */
+ public T getConstant()
+ {
+ return constant;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Margin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Margin.java b/library/src/main/java/com/datatorrent/lib/math/Margin.java
index 1161ba8..94e15d6 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Margin.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Margin.java
@@ -50,92 +50,92 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
@OperatorAnnotation(partitionable = false)
public class Margin<V extends Number> extends BaseNumberValueOperator<V>
{
- /**
- * Sum of numerator values.
- */
- protected double nval = 0.0;
+ /**
+ * Sum of numerator values.
+ */
+ protected double nval = 0.0;
- /**
- * sum of denominator values.
- */
- protected double dval = 0.0;
+ /**
+ * sum of denominator values.
+ */
+ protected double dval = 0.0;
- /**
- * Flag to output margin as percentage.
- */
- protected boolean percent = false;
+ /**
+ * Flag to output margin as percentage.
+ */
+ protected boolean percent = false;
- /**
- * Numerator input port.
- */
- public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
- {
- /**
- * Adds to the numerator value
- */
- @Override
- public void process(V tuple)
- {
- nval += tuple.doubleValue();
- }
- };
+ /**
+ * Numerator input port.
+ */
+ public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
+ {
+ /**
+ * Adds to the numerator value
+ */
+ @Override
+ public void process(V tuple)
+ {
+ nval += tuple.doubleValue();
+ }
+ };
- /**
- * Denominator input port.
- */
- public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
- {
- /**
- * Adds to the denominator value
- */
- @Override
- public void process(V tuple)
- {
- dval += tuple.doubleValue();
- }
- };
+ /**
+ * Denominator input port.
+ */
+ public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
+ {
+ /**
+ * Adds to the denominator value
+ */
+ @Override
+ public void process(V tuple)
+ {
+ dval += tuple.doubleValue();
+ }
+ };
- /**
- * Output margin port.
- */
- public final transient DefaultOutputPort<V> margin = new DefaultOutputPort<V>();
+ /**
+ * Output margin port.
+ */
+ public final transient DefaultOutputPort<V> margin = new DefaultOutputPort<V>();
- /**
- * getter function for percent
- *
- * @return percent
- */
- public boolean getPercent()
- {
- return percent;
- }
+ /**
+ * getter function for percent
+ *
+ * @return percent
+ */
+ public boolean getPercent()
+ {
+ return percent;
+ }
- /**
- * setter function for percent
- *
- * @param val
- * sets percent
- */
- public void setPercent(boolean val)
- {
- percent = val;
- }
+ /**
+ * setter function for percent
+ *
+ * @param val
+ * sets percent
+ */
+ public void setPercent(boolean val)
+ {
+ percent = val;
+ }
- /**
- * Generates tuple emits it as long as denomitor is not 0 Clears internal data
- */
- @Override
- public void endWindow()
- {
- if (dval == 0) {
- return;
- }
- double val = 1 - (nval / dval);
- if (percent) {
- val = val * 100;
- }
- margin.emit(getValue(val));
- nval = 0.0;
- dval = 0.0;
- }
+ /**
+ * Generates tuple emits it as long as denomitor is not 0 Clears internal data
+ */
+ @Override
+ public void endWindow()
+ {
+ if (dval == 0) {
+ return;
+ }
+ double val = 1 - (nval / dval);
+ if (percent) {
+ val = val * 100;
+ }
+ margin.emit(getValue(val));
+ nval = 0.0;
+ dval = 0.0;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
index 29d35bc..e3af508 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
@@ -23,12 +23,11 @@ import java.util.Map;
import org.apache.commons.lang.mutable.MutableDouble;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
/**
*
@@ -52,135 +51,134 @@ import com.datatorrent.api.StreamCodec;
* @tags sum, division, numeric, key value
* @since 0.3.3
*/
-public class MarginKeyVal<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
+public class MarginKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
{
/**
- * Numerator input port that takes a key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * Adds tuple to the numerator hash.
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- addTuple(tuple, numerators);
- }
-
- /**
- * Set StreamCodec used for partitioning.
- */
- @Override
- public StreamCodec<KeyValPair<K, V>> getStreamCodec()
- {
- return getKeyValPairStreamCodec();
- }
- };
+ * Numerator input port that takes a key value pair.
+ */
+ public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new DefaultInputPort<KeyValPair<K, V>>()
+ {
+ /**
+ * Adds tuple to the numerator hash.
+ */
+ @Override
+ public void process(KeyValPair<K, V> tuple)
+ {
+ addTuple(tuple, numerators);
+ }
+
+ /**
+ * Set StreamCodec used for partitioning.
+ */
+ @Override
+ public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+ {
+ return getKeyValPairStreamCodec();
+ }
+ };
/**
- * Denominator input port that takes a key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * Adds tuple to the denominator hash.
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- addTuple(tuple, denominators);
- }
-
- /**
- * Set StreamCodec used for partitioning.
- */
- @Override
- public StreamCodec<KeyValPair<K, V>> getStreamCodec()
- {
- return getKeyValPairStreamCodec();
- }
- };
-
- /**
- * Adds the value for each key.
- *
- * @param tuple
- * @param map
- */
- public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map)
- {
- K key = tuple.getKey();
- if (!doprocessKey(key) || (tuple.getValue() == null)) {
- return;
- }
- MutableDouble val = map.get(key);
- if (val == null) {
- val = new MutableDouble(0.0);
- map.put(cloneKey(key), val);
- }
- val.add(tuple.getValue().doubleValue());
- }
+ * Denominator input port that takes a key value pair.
+ */
+ public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new DefaultInputPort<KeyValPair<K, V>>()
+ {
+ /**
+ * Adds tuple to the denominator hash.
+ */
+ @Override
+ public void process(KeyValPair<K, V> tuple)
+ {
+ addTuple(tuple, denominators);
+ }
+
+ /**
+ * Set StreamCodec used for partitioning.
+ */
+ @Override
+ public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+ {
+ return getKeyValPairStreamCodec();
+ }
+ };
+
+ /**
+ * Adds the value for each key.
+ *
+ * @param tuple
+ * @param map
+ */
+ public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map)
+ {
+ K key = tuple.getKey();
+ if (!doprocessKey(key) || (tuple.getValue() == null)) {
+ return;
+ }
+ MutableDouble val = map.get(key);
+ if (val == null) {
+ val = new MutableDouble(0.0);
+ map.put(cloneKey(key), val);
+ }
+ val.add(tuple.getValue().doubleValue());
+ }
/**
- * Output margin port that emits Key Value pairs.
- */
- public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new DefaultOutputPort<KeyValPair<K, V>>();
-
- protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
- protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
- protected boolean percent = false;
-
- /**
- * getter function for percent
- *
- * @return percent
- */
- public boolean getPercent()
- {
- return percent;
- }
-
- /**
- * setter function for percent
- *
- * @param val
- * sets percent
- */
- public void setPercent(boolean val)
- {
- percent = val;
- }
-
- /**
- * Generates tuples for each key and emits them. Only keys that are in the
- * denominator are iterated on If the key is only in the numerator, it gets
- * ignored (cannot do divide by 0) Clears internal data
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void endWindow()
- {
- Double val;
- for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
- K key = e.getKey();
- MutableDouble nval = numerators.get(key);
- if (nval == null) {
- nval = new MutableDouble(0.0);
- } else {
- numerators.remove(key); // so that all left over keys can be reported
- }
- if (percent) {
- val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
- } else {
- val = 1 - nval.doubleValue() / e.getValue().doubleValue();
- }
-
- margin.emit(new KeyValPair(key, getValue(val.doubleValue())));
- }
-
- numerators.clear();
- denominators.clear();
- }
+ * Output margin port that emits Key Value pairs.
+ */
+ public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new DefaultOutputPort<KeyValPair<K, V>>();
+
+ protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
+ protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
+ protected boolean percent = false;
+
+ /**
+ * getter function for percent
+ *
+ * @return percent
+ */
+ public boolean getPercent()
+ {
+ return percent;
+ }
+
+ /**
+ * setter function for percent
+ *
+ * @param val
+ * sets percent
+ */
+ public void setPercent(boolean val)
+ {
+ percent = val;
+ }
+
+ /**
+ * Generates tuples for each key and emits them. Only keys that are in the
+ * denominator are iterated on If the key is only in the numerator, it gets
+ * ignored (cannot do divide by 0) Clears internal data
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void endWindow()
+ {
+ Double val;
+ for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
+ K key = e.getKey();
+ MutableDouble nval = numerators.get(key);
+ if (nval == null) {
+ nval = new MutableDouble(0.0);
+ } else {
+ numerators.remove(key); // so that all left over keys can be reported
+ }
+ if (percent) {
+ val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
+ } else {
+ val = 1 - nval.doubleValue() / e.getValue().doubleValue();
+ }
+
+ margin.emit(new KeyValPair(key, getValue(val.doubleValue())));
+ }
+
+ numerators.clear();
+ denominators.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
index 2259d85..7ef1f81 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
@@ -18,13 +18,15 @@
*/
package com.datatorrent.lib.math;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
/**
@@ -144,18 +146,16 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K
{
HashMap<K, V> tuples = new HashMap<K, V>();
Double val;
- for (Map.Entry<K, MutableDouble> e: denominators.entrySet()) {
+ for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
MutableDouble nval = numerators.get(e.getKey());
if (nval == null) {
nval = new MutableDouble(0.0);
- }
- else {
+ } else {
numerators.remove(e.getKey()); // so that all left over keys can be reported
}
if (percent) {
val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
- }
- else {
+ } else {
val = 1 - nval.doubleValue() / e.getValue().doubleValue();
}
tuples.put(e.getKey(), getValue(val.doubleValue()));
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Max.java b/library/src/main/java/com/datatorrent/lib/math/Max.java
index e4171f6..8ec8b2f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Max.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Max.java
@@ -64,8 +64,7 @@ public class Max<V extends Number> extends BaseNumberValueOperator<V> implements
if (!flag) {
high = tuple;
flag = true;
- }
- else if (high.doubleValue() < tuple.doubleValue()) {
+ } else if (high.doubleValue() < tuple.doubleValue()) {
high = tuple;
}
}
@@ -74,7 +73,7 @@ public class Max<V extends Number> extends BaseNumberValueOperator<V> implements
* Max value output port.
*/
public final transient DefaultOutputPort<V> max = new DefaultOutputPort<V>()
- {
+ {
@Override
public Unifier<V> getUnifier()
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
index 58a947f..95a0a08 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
@@ -21,12 +21,11 @@ package com.datatorrent.lib.math;
import java.util.HashMap;
import java.util.Map;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
/**
*
@@ -68,8 +67,7 @@ public class MaxKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
if (val == null) {
val = tval;
highs.put(cloneKey(key), val);
- }
- else if (val.doubleValue() < tval.doubleValue()) {
+ } else if (val.doubleValue() < tval.doubleValue()) {
highs.put(key, tval);
}
}
@@ -97,7 +95,7 @@ public class MaxKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
* Clears internal data. Node only works in windowed mode.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
+ @Override
public void endWindow()
{
if (!highs.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Min.java b/library/src/main/java/com/datatorrent/lib/math/Min.java
index 244d990..4b3fa23 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Min.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Min.java
@@ -41,17 +41,17 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
*/
public class Min<V extends Number> extends BaseNumberValueOperator<V> implements Unifier<V>
{
- /**
- * Computed low value.
- */
+ /**
+ * Computed low value.
+ */
protected V low;
// transient field
protected boolean flag = false;
- /**
- * Input port that takes a number and compares to min and stores the new min.
- */
+ /**
+ * Input port that takes a number and compares to min and stores the new min.
+ */
public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
{
/**
@@ -73,8 +73,7 @@ public class Min<V extends Number> extends BaseNumberValueOperator<V> implements
if (!flag) {
low = tuple;
flag = true;
- }
- else if (low.doubleValue() > tuple.doubleValue()) {
+ } else if (low.doubleValue() > tuple.doubleValue()) {
low = tuple;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
index 5ea710b..2468239 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
@@ -21,12 +21,11 @@ package com.datatorrent.lib.math;
import java.util.HashMap;
import java.util.Map;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
/**
*
@@ -48,9 +47,9 @@ import com.datatorrent.api.StreamCodec;
*/
public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
{
- /**
- * Input port which takes a key vaue pair and updates the value for each key if there is a new min.
- */
+ /**
+ * Input port which takes a key vaue pair and updates the value for each key if there is a new min.
+ */
public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
{
/**
@@ -67,8 +66,7 @@ public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
V val = mins.get(key);
if (val == null) {
mins.put(cloneKey(key), tval);
- }
- else if (val.doubleValue() > tval.doubleValue()) {
+ } else if (val.doubleValue() > tval.doubleValue()) {
mins.put(key, tval);
}
}
@@ -94,7 +92,7 @@ public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
* Clears internal data. Node only works in windowed mode.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
+ @Override
public void endWindow()
{
if (!mins.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
index c1c70d3..dd56f7f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
@@ -18,10 +18,10 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
/**
* Multiplies input tuple (Number) by the value of property "multiplier" and emits the result on respective ports.
@@ -51,9 +51,9 @@ import com.datatorrent.api.annotation.Stateless;
@Stateless
public class MultiplyByConstant extends BaseOperator
{
- /**
- * Input number port.
- */
+ /**
+ * Input number port.
+ */
public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
{
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Quotient.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Quotient.java b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
index d55f205..ed08e86 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Quotient.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
@@ -47,63 +47,63 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
@OperatorAnnotation(partitionable = false)
public class Quotient<V extends Number> extends BaseNumberValueOperator<V>
{
- protected double nval = 0.0;
- protected double dval = 0.0;
- int mult_by = 1;
+ protected double nval = 0.0;
+ protected double dval = 0.0;
+ int mult_by = 1;
- /**
- * Numerator values input port.
- */
- public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
- {
- /**
- * Adds to the numerator value
- */
- @Override
- public void process(V tuple)
- {
- nval += tuple.doubleValue();
- }
- };
+ /**
+ * Numerator values input port.
+ */
+ public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
+ {
+ /**
+ * Adds to the numerator value
+ */
+ @Override
+ public void process(V tuple)
+ {
+ nval += tuple.doubleValue();
+ }
+ };
- /**
- * Denominator values input port.
- */
- public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
- {
- /**
- * Adds to the denominator value
- */
- @Override
- public void process(V tuple)
- {
- dval += tuple.doubleValue();
- }
- };
+ /**
+ * Denominator values input port.
+ */
+ public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
+ {
+ /**
+ * Adds to the denominator value
+ */
+ @Override
+ public void process(V tuple)
+ {
+ dval += tuple.doubleValue();
+ }
+ };
- /**
- * Quotient output port.
- */
- public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>();
+ /**
+ * Quotient output port.
+ */
+ public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>();
- public void setMult_by(int i)
- {
- mult_by = i;
- }
+ public void setMult_by(int i)
+ {
+ mult_by = i;
+ }
- /**
- * Generates tuple emits it as long as denominator is not 0. Clears internal
- * data
- */
- @Override
- public void endWindow()
- {
- if (dval == 0) {
- return;
- }
- double val = (nval / dval) * mult_by;
- quotient.emit(getValue(val));
- nval = 0.0;
- dval = 0.0;
- }
+ /**
+ * Generates tuple emits it as long as denominator is not 0. Clears internal
+ * data
+ */
+ @Override
+ public void endWindow()
+ {
+ if (dval == 0) {
+ return;
+ }
+ double val = (nval / dval) * mult_by;
+ quotient.emit(getValue(val));
+ nval = 0.0;
+ dval = 0.0;
+ }
}