You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:42:03 UTC
[13/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed
checkstyle violations of malhar library module
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java b/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
index c114bad..a10fe95 100644
--- a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
@@ -23,11 +23,11 @@ import java.util.Map;
import javax.validation.constraints.Min;
-import com.datatorrent.api.annotation.OperatorAnnotation;
import org.apache.commons.lang.mutable.MutableDouble;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
/**
@@ -63,175 +63,175 @@ import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
*/
@OperatorAnnotation(partitionable = false)
public class QuotientMap<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
+ BaseNumberKeyValueOperator<K, V>
{
- /**
- * Numerator key/sum value map.
- */
- protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
-
- /**
- * Denominator key/sum value map.
- */
- protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
-
- /**
- * Count occurrence of keys if set to true.
- */
- boolean countkey = false;
-
- /**
- * Quotient multiply by value.
- */
- int mult_by = 1;
-
- /**
- * Numerator input port.
- */
- public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
- {
- /**
- * Added tuple to the numerator hash
- */
- @Override
- public void process(Map<K, V> tuple)
- {
- addTuple(tuple, numerators);
- }
- };
-
- /**
- * Denominator input port.
- */
- public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>()
- {
- /**
- * Added tuple to the denominator hash
- */
- @Override
- public void process(Map<K, V> tuple)
- {
- addTuple(tuple, denominators);
- }
- };
-
- /**
- * Quotient output port.
- */
- public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>();
-
- /**
- * Add tuple to nval/dval map.
- *
- * @param tuple
- * key/value map on input port.
- * @param map
- * key/summed value map.
- */
- public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
- {
- for (Map.Entry<K, V> e : tuple.entrySet()) {
- addEntry(e.getKey(), e.getValue(), map);
- }
- }
-
- /**
- * Add/Update entry to key/sum value map.
- *
- * @param key
- * name.
- * @param value
- * value for key.
- * @param map
- * numerator/denominator key/sum map.
- */
- public void addEntry(K key, V value, Map<K, MutableDouble> map)
- {
- if (!doprocessKey(key) || (value == null)) {
- return;
- }
- MutableDouble val = map.get(key);
- if (val == null) {
- if (countkey) {
- val = new MutableDouble(1.00);
- } else {
- val = new MutableDouble(value.doubleValue());
- }
- } else {
- if (countkey) {
- val.increment();
- } else {
- val.add(value.doubleValue());
- }
- }
- map.put(cloneKey(key), val);
- }
-
- /**
- * getter for mult_by
- *
- * @return mult_by
- */
-
- @Min(0)
- public int getMult_by()
- {
- return mult_by;
- }
-
- /**
- * getter for countkey
- *
- * @return countkey
- */
- public boolean getCountkey()
- {
- return countkey;
- }
-
- /**
- * Setter for mult_by
- *
- * @param i
- */
- public void setMult_by(int i)
- {
- mult_by = i;
- }
-
- /**
- * setter for countkey
- *
- * @param i
- * sets countkey
- */
- public void setCountkey(boolean i)
- {
- countkey = i;
- }
-
- /**
- * Generates tuples for each key and emits them. Only keys that are in the
- * denominator are iterated on If the key is only in the numerator, it gets
- * ignored (cannot do divide by 0) Clears internal data
- */
- @Override
- public void endWindow()
- {
- HashMap<K, Double> tuples = new HashMap<K, Double>();
- for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
- MutableDouble nval = numerators.get(e.getKey());
- if (nval == null) {
- tuples.put(e.getKey(), new Double(0.0));
- } else {
- tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
- .doubleValue()) * mult_by));
- }
- }
- if (!tuples.isEmpty()) {
- quotient.emit(tuples);
- }
- numerators.clear();
- denominators.clear();
- }
+ /**
+ * Numerator key/sum value map.
+ */
+ protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
+
+ /**
+ * Denominator key/sum value map.
+ */
+ protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
+
+ /**
+ * Count occurrence of keys if set to true.
+ */
+ boolean countkey = false;
+
+ /**
+ * Quotient multiply by value.
+ */
+ int mult_by = 1;
+
+ /**
+ * Numerator input port.
+ */
+ public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
+ {
+ /**
+ * Added tuple to the numerator hash
+ */
+ @Override
+ public void process(Map<K, V> tuple)
+ {
+ addTuple(tuple, numerators);
+ }
+ };
+
+ /**
+ * Denominator input port.
+ */
+ public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>()
+ {
+ /**
+ * Added tuple to the denominator hash
+ */
+ @Override
+ public void process(Map<K, V> tuple)
+ {
+ addTuple(tuple, denominators);
+ }
+ };
+
+ /**
+ * Quotient output port.
+ */
+ public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>();
+
+ /**
+ * Add tuple to nval/dval map.
+ *
+ * @param tuple
+ * key/value map on input port.
+ * @param map
+ * key/summed value map.
+ */
+ public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
+ {
+ for (Map.Entry<K, V> e : tuple.entrySet()) {
+ addEntry(e.getKey(), e.getValue(), map);
+ }
+ }
+
+ /**
+ * Add/Update entry to key/sum value map.
+ *
+ * @param key
+ * name.
+ * @param value
+ * value for key.
+ * @param map
+ * numerator/denominator key/sum map.
+ */
+ public void addEntry(K key, V value, Map<K, MutableDouble> map)
+ {
+ if (!doprocessKey(key) || (value == null)) {
+ return;
+ }
+ MutableDouble val = map.get(key);
+ if (val == null) {
+ if (countkey) {
+ val = new MutableDouble(1.00);
+ } else {
+ val = new MutableDouble(value.doubleValue());
+ }
+ } else {
+ if (countkey) {
+ val.increment();
+ } else {
+ val.add(value.doubleValue());
+ }
+ }
+ map.put(cloneKey(key), val);
+ }
+
+ /**
+ * getter for mult_by
+ *
+ * @return mult_by
+ */
+
+ @Min(0)
+ public int getMult_by()
+ {
+ return mult_by;
+ }
+
+ /**
+ * getter for countkey
+ *
+ * @return countkey
+ */
+ public boolean getCountkey()
+ {
+ return countkey;
+ }
+
+ /**
+ * Setter for mult_by
+ *
+ * @param i
+ */
+ public void setMult_by(int i)
+ {
+ mult_by = i;
+ }
+
+ /**
+ * setter for countkey
+ *
+ * @param i
+ * sets countkey
+ */
+ public void setCountkey(boolean i)
+ {
+ countkey = i;
+ }
+
+ /**
+ * Generates tuples for each key and emits them. Only keys that are in the
+ * denominator are iterated on If the key is only in the numerator, it gets
+ * ignored (cannot do divide by 0) Clears internal data
+ */
+ @Override
+ public void endWindow()
+ {
+ HashMap<K, Double> tuples = new HashMap<K, Double>();
+ for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
+ MutableDouble nval = numerators.get(e.getKey());
+ if (nval == null) {
+ tuples.put(e.getKey(), new Double(0.0));
+ } else {
+ tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
+ .doubleValue()) * mult_by));
+ }
+ }
+ if (!tuples.isEmpty()) {
+ quotient.emit(tuples);
+ }
+ numerators.clear();
+ denominators.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Range.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Range.java b/library/src/main/java/com/datatorrent/lib/math/Range.java
index 9a7ab7a..ad54d4d 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Range.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Range.java
@@ -43,62 +43,62 @@ import com.datatorrent.lib.util.UnifierRange;
*/
public class Range<V extends Number> extends BaseNumberValueOperator<V>
{
- /**
- * Highest value on input port.
- */
- protected V high = null;
+ /**
+ * Highest value on input port.
+ */
+ protected V high = null;
- /**
- * Lowest value on input port.
- */
- protected V low = null;
+ /**
+ * Lowest value on input port.
+ */
+ protected V low = null;
- /**
- * Input data port.
- */
- public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
- {
- /**
- * Process each tuple to compute new high and low
- */
- @Override
- public void process(V tuple)
- {
- if ((low == null) || (low.doubleValue() > tuple.doubleValue())) {
- low = tuple;
- }
+ /**
+ * Input data port.
+ */
+ public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+ {
+ /**
+ * Process each tuple to compute new high and low
+ */
+ @Override
+ public void process(V tuple)
+ {
+ if ((low == null) || (low.doubleValue() > tuple.doubleValue())) {
+ low = tuple;
+ }
- if ((high == null) || (high.doubleValue() < tuple.doubleValue())) {
- high = tuple;
- }
- }
- };
+ if ((high == null) || (high.doubleValue() < tuple.doubleValue())) {
+ high = tuple;
+ }
+ }
+ };
- /**
- * Output range port, which emits high low unifier operator.
- */
- public final transient DefaultOutputPort<HighLow<V>> range = new DefaultOutputPort<HighLow<V>>()
- {
- @Override
- public Unifier<HighLow<V>> getUnifier()
- {
- return new UnifierRange<V>();
- }
- };
+ /**
+ * Output range port, which emits high low unifier operator.
+ */
+ public final transient DefaultOutputPort<HighLow<V>> range = new DefaultOutputPort<HighLow<V>>()
+ {
+ @Override
+ public Unifier<HighLow<V>> getUnifier()
+ {
+ return new UnifierRange<V>();
+ }
+ };
- /**
- * Emits the range. If no tuple was received in the window, no emit is done
- * Clears the internal data before return
- */
- @Override
- public void endWindow()
- {
- if ((low != null) && (high != null)) {
- HighLow tuple = new HighLow(getValue(high.doubleValue()),
- getValue(low.doubleValue()));
- range.emit(tuple);
- }
- high = null;
- low = null;
- }
+ /**
+ * Emits the range. If no tuple was received in the window, no emit is done
+ * Clears the internal data before return
+ */
+ @Override
+ public void endWindow()
+ {
+ if ((low != null) && (high != null)) {
+ HighLow tuple = new HighLow(getValue(high.doubleValue()),
+ getValue(low.doubleValue()));
+ range.emit(tuple);
+ }
+ high = null;
+ low = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
index a006415..241f482 100644
--- a/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/RangeKeyVal.java
@@ -21,15 +21,14 @@ package com.datatorrent.lib.math;
import java.util.HashMap;
import java.util.Map;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamCodec;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.HighLow;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.UnifierKeyValRange;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.StreamCodec;
-
/**
* This operator emits the range for each key at the end of window.
* <p>
@@ -50,83 +49,82 @@ import com.datatorrent.api.StreamCodec;
* @tags range, number, comparison, key value
* @since 0.3.3
*/
-public class RangeKeyVal<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
+public class RangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
{
- /**
- * key/high value map.
- */
- protected HashMap<K, V> high = new HashMap<K, V>();
+ /**
+ * key/high value map.
+ */
+ protected HashMap<K, V> high = new HashMap<K, V>();
- /**
- * key/low value map.
- */
- protected HashMap<K, V> low = new HashMap<K, V>();
+ /**
+ * key/low value map.
+ */
+ protected HashMap<K, V> low = new HashMap<K, V>();
- /**
- * Input port that takes a key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * Process each key and computes new high and low.
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- K key = tuple.getKey();
- if (!doprocessKey(key) || (tuple.getValue() == null)) {
- return;
- }
- V val = low.get(key);
- V eval = tuple.getValue();
- if ((val == null) || (val.doubleValue() > eval.doubleValue())) {
- low.put(cloneKey(key), eval);
- }
+ /**
+ * Input port that takes a key value pair.
+ */
+ public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+ {
+ /**
+ * Process each key and computes new high and low.
+ */
+ @Override
+ public void process(KeyValPair<K, V> tuple)
+ {
+ K key = tuple.getKey();
+ if (!doprocessKey(key) || (tuple.getValue() == null)) {
+ return;
+ }
+ V val = low.get(key);
+ V eval = tuple.getValue();
+ if ((val == null) || (val.doubleValue() > eval.doubleValue())) {
+ low.put(cloneKey(key), eval);
+ }
- val = high.get(key);
- if ((val == null) || (val.doubleValue() < eval.doubleValue())) {
- high.put(cloneKey(key), eval);
- }
- }
+ val = high.get(key);
+ if ((val == null) || (val.doubleValue() < eval.doubleValue())) {
+ high.put(cloneKey(key), eval);
+ }
+ }
- @Override
- public StreamCodec<KeyValPair<K, V>> getStreamCodec()
- {
- return getKeyValPairStreamCodec();
- }
- };
+ @Override
+ public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+ {
+ return getKeyValPairStreamCodec();
+ }
+ };
- /**
- * Range output port to send out the high low range.
- */
- public final transient DefaultOutputPort<KeyValPair<K, HighLow<V>>> range = new DefaultOutputPort<KeyValPair<K, HighLow<V>>>()
- {
- @Override
- public Unifier<KeyValPair<K, HighLow<V>>> getUnifier()
- {
- return new UnifierKeyValRange<K,V>();
- }
- };
+ /**
+ * Range output port to send out the high low range.
+ */
+ public final transient DefaultOutputPort<KeyValPair<K, HighLow<V>>> range = new DefaultOutputPort<KeyValPair<K, HighLow<V>>>()
+ {
+ @Override
+ public Unifier<KeyValPair<K, HighLow<V>>> getUnifier()
+ {
+ return new UnifierKeyValRange<K,V>();
+ }
+ };
- /**
- * Emits range for each key. If no data is received, no emit is done Clears
- * the internal data before return
- */
- @Override
- public void endWindow()
- {
- for (Map.Entry<K, V> e : high.entrySet()) {
- range.emit(new KeyValPair<K, HighLow<V>>(e.getKey(), new HighLow(e
- .getValue(), low.get(e.getKey()))));
- }
- clearCache();
- }
+ /**
+ * Emits range for each key. If no data is received, no emit is done Clears
+ * the internal data before return
+ */
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<K, V> e : high.entrySet()) {
+ range.emit(new KeyValPair<K, HighLow<V>>(e.getKey(), new HighLow(e
+ .getValue(), low.get(e.getKey()))));
+ }
+ clearCache();
+ }
- public void clearCache()
- {
- high.clear();
- low.clear();
- }
+ public void clearCache()
+ {
+ high.clear();
+ low.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
index 24f09f9..286d72e 100644
--- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
+++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java
@@ -18,10 +18,10 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* Calculate the running average of the input numbers and emit it at the end of the window.
@@ -46,70 +46,70 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
@OperatorAnnotation(partitionable = false)
public class RunningAverage extends BaseOperator
{
- /**
- * Computed average.
- */
- double average;
+ /**
+ * Computed average.
+ */
+ double average;
- /**
- * Number of values on input port.
- */
- long count;
+ /**
+ * Number of values on input port.
+ */
+ long count;
- /**
- * Input number port.
- */
- public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
- {
- @Override
- public void process(Number tuple)
- {
- double sum = (count * average) + tuple.doubleValue();
- count++;
- average = sum / count;
- }
- };
+ /**
+ * Input number port.
+ */
+ public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
+ {
+ @Override
+ public void process(Number tuple)
+ {
+ double sum = (count * average) + tuple.doubleValue();
+ count++;
+ average = sum / count;
+ }
+ };
- /**
- * Double average output port.
- */
- public final transient DefaultOutputPort<Double> doubleAverage = new DefaultOutputPort<Double>();
+ /**
+ * Double average output port.
+ */
+ public final transient DefaultOutputPort<Double> doubleAverage = new DefaultOutputPort<Double>();
- /**
- * Float average output port.
- */
- public final transient DefaultOutputPort<Float> floatAverage = new DefaultOutputPort<Float>();
+ /**
+ * Float average output port.
+ */
+ public final transient DefaultOutputPort<Float> floatAverage = new DefaultOutputPort<Float>();
- /**
- * Long average output port.
- */
- public final transient DefaultOutputPort<Long> longAverage = new DefaultOutputPort<Long>();
+ /**
+ * Long average output port.
+ */
+ public final transient DefaultOutputPort<Long> longAverage = new DefaultOutputPort<Long>();
- /**
- * Integer average output port.
- */
- public final transient DefaultOutputPort<Integer> integerAverage = new DefaultOutputPort<Integer>();
+ /**
+ * Integer average output port.
+ */
+ public final transient DefaultOutputPort<Integer> integerAverage = new DefaultOutputPort<Integer>();
- /**
- * End window operator override.
- */
- @Override
- public void endWindow()
- {
- if (doubleAverage.isConnected()) {
- doubleAverage.emit(average);
- }
+ /**
+ * End window operator override.
+ */
+ @Override
+ public void endWindow()
+ {
+ if (doubleAverage.isConnected()) {
+ doubleAverage.emit(average);
+ }
- if (floatAverage.isConnected()) {
- floatAverage.emit((float) average);
- }
+ if (floatAverage.isConnected()) {
+ floatAverage.emit((float)average);
+ }
- if (longAverage.isConnected()) {
- longAverage.emit((long) average);
- }
+ if (longAverage.isConnected()) {
+ longAverage.emit((long)average);
+ }
- if (integerAverage.isConnected()) {
- integerAverage.emit((int) average);
- }
- }
+ if (integerAverage.isConnected()) {
+ integerAverage.emit((int)average);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Sigma.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Sigma.java b/library/src/main/java/com/datatorrent/lib/math/Sigma.java
index 7355238..6bfb9cf 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Sigma.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Sigma.java
@@ -18,10 +18,10 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
import java.util.Collection;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
/**
* Adds incoming tuple to the state and emits the result of each addition on the respective ports.
* <p>
@@ -47,27 +47,27 @@ import java.util.Collection;
@OperatorAnnotation(partitionable = false)
public class Sigma<T extends Number> extends AbstractAggregateCalc<T>
{
- @Override
- public long aggregateLongs(Collection<T> collection)
- {
- long l = 0;
+ @Override
+ public long aggregateLongs(Collection<T> collection)
+ {
+ long l = 0;
- for (Number n : collection) {
- l += n.longValue();
- }
+ for (Number n : collection) {
+ l += n.longValue();
+ }
- return l;
- }
+ return l;
+ }
- @Override
- public double aggregateDoubles(Collection<T> collection)
- {
- double d = 0;
+ @Override
+ public double aggregateDoubles(Collection<T> collection)
+ {
+ double d = 0;
- for (Number n : collection) {
- d += n.doubleValue();
- }
+ for (Number n : collection) {
+ d += n.doubleValue();
+ }
- return d;
- }
+ return d;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Sum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Sum.java b/library/src/main/java/com/datatorrent/lib/math/Sum.java
index 38be2fc..0f5e64f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Sum.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Sum.java
@@ -56,204 +56,204 @@ import com.datatorrent.lib.util.UnifierSumNumber;
* @since 0.3.3
*/
public class Sum<V extends Number> extends BaseNumberValueOperator<V> implements
- Unifier<V>
+ Unifier<V>
{
- /**
- * Sum value.
- */
- protected double sums = 0;
+ /**
+ * Sum value.
+ */
+ protected double sums = 0;
- /**
- * Input tuple processed flag.
- */
- protected boolean tupleAvailable = false;
+ /**
+ * Input tuple processed flag.
+ */
+ protected boolean tupleAvailable = false;
- /**
- * Accumulate sum flag.
- */
- protected boolean cumulative = false;
+ /**
+ * Accumulate sum flag.
+ */
+ protected boolean cumulative = false;
- /**
- * Input port to receive data. It computes sum and count for each tuple.
- */
- public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
- {
- /**
- * Computes sum and count with each tuple
- */
- @Override
- public void process(V tuple)
- {
- Sum.this.process(tuple);
- tupleAvailable = true;
- }
- };
+ /**
+ * Input port to receive data. It computes sum and count for each tuple.
+ */
+ public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+ {
+ /**
+ * Computes sum and count with each tuple
+ */
+ @Override
+ public void process(V tuple)
+ {
+ Sum.this.process(tuple);
+ tupleAvailable = true;
+ }
+ };
- /**
- * Unifier process override.
- */
- @Override
- public void process(V tuple)
- {
- sums += tuple.doubleValue();
- tupleAvailable = true; // also need to set here for Unifier
- }
+ /**
+ * Unifier process override.
+ */
+ @Override
+ public void process(V tuple)
+ {
+ sums += tuple.doubleValue();
+ tupleAvailable = true; // also need to set here for Unifier
+ }
- /**
- * Output sum port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<V> sum = new DefaultOutputPort<V>()
- {
- @Override
- public Unifier<V> getUnifier()
- {
- UnifierSumNumber<V> ret = new UnifierSumNumber<V>();
- ret.setVType(getType());
- return ret;
- }
- };
+ /**
+ * Output sum port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<V> sum = new DefaultOutputPort<V>()
+ {
+ @Override
+ public Unifier<V> getUnifier()
+ {
+ UnifierSumNumber<V> ret = new UnifierSumNumber<V>();
+ ret.setVType(getType());
+ return ret;
+ }
+ };
- /**
- * Output double sum port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Double> sumDouble = new DefaultOutputPort<Double>()
- {
- @Override
- public Unifier<Double> getUnifier()
- {
- UnifierSumNumber<Double> ret = new UnifierSumNumber<Double>();
- ret.setType(Double.class);
- return ret;
- }
- };
+ /**
+ * Output double sum port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Double> sumDouble = new DefaultOutputPort<Double>()
+ {
+ @Override
+ public Unifier<Double> getUnifier()
+ {
+ UnifierSumNumber<Double> ret = new UnifierSumNumber<Double>();
+ ret.setType(Double.class);
+ return ret;
+ }
+ };
- /**
- * Output integer sum port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Integer> sumInteger = new DefaultOutputPort<Integer>()
- {
- @Override
- public Unifier<Integer> getUnifier()
- {
- UnifierSumNumber<Integer> ret = new UnifierSumNumber<Integer>();
- ret.setType(Integer.class);
- return ret;
- }
- };
+ /**
+ * Output integer sum port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Integer> sumInteger = new DefaultOutputPort<Integer>()
+ {
+ @Override
+ public Unifier<Integer> getUnifier()
+ {
+ UnifierSumNumber<Integer> ret = new UnifierSumNumber<Integer>();
+ ret.setType(Integer.class);
+ return ret;
+ }
+ };
- /**
- * Output Long sum port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Long> sumLong = new DefaultOutputPort<Long>()
- {
- @Override
- public Unifier<Long> getUnifier()
- {
- UnifierSumNumber<Long> ret = new UnifierSumNumber<Long>();
- ret.setType(Long.class);
- return ret;
- }
- };
+ /**
+ * Output Long sum port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Long> sumLong = new DefaultOutputPort<Long>()
+ {
+ @Override
+ public Unifier<Long> getUnifier()
+ {
+ UnifierSumNumber<Long> ret = new UnifierSumNumber<Long>();
+ ret.setType(Long.class);
+ return ret;
+ }
+ };
- /**
- * Output short sum port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Short> sumShort = new DefaultOutputPort<Short>()
- {
- @Override
- public Unifier<Short> getUnifier()
- {
- UnifierSumNumber<Short> ret = new UnifierSumNumber<Short>();
- ret.setType(Short.class);
- return ret;
- }
- };
+ /**
+ * Output short sum port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Short> sumShort = new DefaultOutputPort<Short>()
+ {
+ @Override
+ public Unifier<Short> getUnifier()
+ {
+ UnifierSumNumber<Short> ret = new UnifierSumNumber<Short>();
+ ret.setType(Short.class);
+ return ret;
+ }
+ };
- /**
- * Output float sum port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Float> sumFloat = new DefaultOutputPort<Float>()
- {
- @Override
- public Unifier<Float> getUnifier()
- {
- UnifierSumNumber<Float> ret = new UnifierSumNumber<Float>();
- ret.setType(Float.class);
- return ret;
- }
- };
+ /**
+ * Output float sum port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Float> sumFloat = new DefaultOutputPort<Float>()
+ {
+ @Override
+ public Unifier<Float> getUnifier()
+ {
+ UnifierSumNumber<Float> ret = new UnifierSumNumber<Float>();
+ ret.setType(Float.class);
+ return ret;
+ }
+ };
- /**
- * Redis server output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Map<Integer, Integer>> redisport = new DefaultOutputPort<Map<Integer, Integer>>();
+ /**
+ * Redis server output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Map<Integer, Integer>> redisport = new DefaultOutputPort<Map<Integer, Integer>>();
- /**
- * Check if sum has to be cumulative.
- *
- * @return cumulative flag
- */
- public boolean isCumulative()
- {
- return cumulative;
- }
+ /**
+ * Check if sum has to be cumulative.
+ *
+ * @return cumulative flag
+ */
+ public boolean isCumulative()
+ {
+ return cumulative;
+ }
- /**
- * Set cumulative flag.
- *
- * @param cumulative
- * flag
- */
- public void setCumulative(boolean cumulative)
- {
- this.cumulative = cumulative;
- }
+ /**
+ * Set cumulative flag.
+ *
+ * @param cumulative
+ * flag
+ */
+ public void setCumulative(boolean cumulative)
+ {
+ this.cumulative = cumulative;
+ }
- /**
- * Emits sum and count if ports are connected
- */
- @Override
- public void endWindow()
- {
- if (doEmit()) {
- sum.emit(getValue(sums));
- sumDouble.emit(sums);
- sumInteger.emit((int) sums);
- sumLong.emit((long) sums);
- sumShort.emit((short) sums);
- sumFloat.emit((float) sums);
- tupleAvailable = false;
- Map<Integer, Integer> redis = new HashMap<Integer, Integer>();
- redis.put(1, (int) sums);
- redisport.emit(redis);
- }
- clearCache();
- }
+ /**
+ * Emits sum and count if ports are connected
+ */
+ @Override
+ public void endWindow()
+ {
+ if (doEmit()) {
+ sum.emit(getValue(sums));
+ sumDouble.emit(sums);
+ sumInteger.emit((int)sums);
+ sumLong.emit((long)sums);
+ sumShort.emit((short)sums);
+ sumFloat.emit((float)sums);
+ tupleAvailable = false;
+ Map<Integer, Integer> redis = new HashMap<Integer, Integer>();
+ redis.put(1, (int)sums);
+ redisport.emit(redis);
+ }
+ clearCache();
+ }
- /**
- * Clears the cache making this operator stateless on window boundary
- */
- private void clearCache()
- {
- if (!cumulative) {
- sums = 0;
- }
- }
+ /**
+ * Clears the cache making this operator stateless on window boundary
+ */
+ private void clearCache()
+ {
+ if (!cumulative) {
+ sums = 0;
+ }
+ }
- /**
- * Decides whether emit has to be done in this window on port "sum"
- *
- * @return true is sum port is connected
- */
- private boolean doEmit()
- {
- return tupleAvailable;
- }
+ /**
+ * Decides whether emit has to be done in this window on port "sum"
+ *
+ * @return true is sum port is connected
+ */
+ private boolean doEmit()
+ {
+ return tupleAvailable;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java b/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
index 76242f4..c2d8465 100644
--- a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
@@ -18,16 +18,18 @@
*/
package com.datatorrent.lib.math;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableInt;
+
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.UnifierHashMapInteger;
import com.datatorrent.lib.util.UnifierHashMapSumKeys;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableInt;
/**
* Emits the sum and count of values for each key at the end of window.
@@ -58,244 +60,244 @@ import org.apache.commons.lang.mutable.MutableInt;
* @since 0.3.3
*/
public class SumCountMap<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
+ BaseNumberKeyValueOperator<K, V>
{
- /**
- * Key/double sum map.
- */
- protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+ /**
+ * Key/double sum map.
+ */
+ protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+
+ /**
+ * Key/integer sum map.
+ */
+ protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
- /**
- * Key/integer sum map.
- */
- protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
+ /**
+ * Cumulative sum flag.
+ */
+ protected boolean cumulative = false;
- /**
- * Cumulative sum flag.
- */
- protected boolean cumulative = false;
+ /**
+ * Input port that takes a map. It adds the values for each key and counts the number of occurrences for each key.
+ */
+ public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
+ {
+ /**
+ * For each tuple (a HashMap of keys,val pairs) Adds the values for each
+ * key, Counts the number of occurrences of each key
+ */
+ @Override
+ public void process(Map<K, V> tuple)
+ {
+ for (Map.Entry<K, V> e : tuple.entrySet()) {
+ K key = e.getKey();
+ if (!doprocessKey(key)) {
+ continue;
+ }
+ if (sum.isConnected()) {
+ MutableDouble val = sums.get(key);
+ if (val == null) {
+ val = new MutableDouble(e.getValue().doubleValue());
+ } else {
+ val.add(e.getValue().doubleValue());
+ }
+ sums.put(cloneKey(key), val);
+ }
+ if (SumCountMap.this.count.isConnected()) {
+ MutableInt count = counts.get(key);
+ if (count == null) {
+ count = new MutableInt(0);
+ counts.put(cloneKey(key), count);
+ }
+ count.increment();
+ }
+ }
+ }
+ };
- /**
- * Input port that takes a map. It adds the values for each key and counts the number of occurrences for each key.
- */
- public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
- {
- /**
- * For each tuple (a HashMap of keys,val pairs) Adds the values for each
- * key, Counts the number of occurrences of each key
- */
- @Override
- public void process(Map<K, V> tuple)
- {
- for (Map.Entry<K, V> e : tuple.entrySet()) {
- K key = e.getKey();
- if (!doprocessKey(key)) {
- continue;
- }
- if (sum.isConnected()) {
- MutableDouble val = sums.get(key);
- if (val == null) {
- val = new MutableDouble(e.getValue().doubleValue());
- } else {
- val.add(e.getValue().doubleValue());
- }
- sums.put(cloneKey(key), val);
- }
- if (SumCountMap.this.count.isConnected()) {
- MutableInt count = counts.get(key);
- if (count == null) {
- count = new MutableInt(0);
- counts.put(cloneKey(key), count);
- }
- count.increment();
- }
- }
- }
- };
+ /**
+ * Key,sum map output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>()
+ {
+ @Override
+ public Unifier<HashMap<K, V>> getUnifier()
+ {
+ return new UnifierHashMapSumKeys<K, V>();
+ }
+ };
- /**
- * Key,sum map output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>()
- {
- @Override
- public Unifier<HashMap<K, V>> getUnifier()
- {
- return new UnifierHashMapSumKeys<K, V>();
- }
- };
+ /**
+ * Key,double sum map output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>()
+ {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public Unifier<HashMap<K, Double>> getUnifier()
+ {
+ UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>();
+ ret.setType(Double.class);
+ return ret;
+ }
+ };
- /**
- * Key,double sum map output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Double>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>();
- ret.setType(Double.class);
- return ret;
- }
- };
+ /**
+ * Key,integer sum output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>()
+ {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public Unifier<HashMap<K, Integer>> getUnifier()
+ {
+ UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>();
+ ret.setType(Integer.class);
+ return ret;
+ }
+ };
- /**
- * Key,integer sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Integer>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>();
- ret.setType(Integer.class);
- return ret;
- }
- };
-
/**
- * Key,long sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Long>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>();
- ret.setType(Long.class);
- return ret;
- }
- };
+ * Key,long sum output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>()
+ {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public Unifier<HashMap<K, Long>> getUnifier()
+ {
+ UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>();
+ ret.setType(Long.class);
+ return ret;
+ }
+ };
/**
- * Key,short sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Short>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>();
- ret.setType(Short.class);
- return ret;
- }
- };
+ * Key,short sum output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>()
+ {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public Unifier<HashMap<K, Short>> getUnifier()
+ {
+ UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>();
+ ret.setType(Short.class);
+ return ret;
+ }
+ };
/**
- * Key,float sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Float>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>();
- ret.setType(Float.class);
- return ret;
- }
- };
+ * Key,float sum output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>()
+ {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public Unifier<HashMap<K, Float>> getUnifier()
+ {
+ UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>();
+ ret.setType(Float.class);
+ return ret;
+ }
+ };
/**
- * Key,integer sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>()
- {
- @Override
- public Unifier<HashMap<K, Integer>> getUnifier()
- {
- return new UnifierHashMapInteger<K>();
- }
- };
+ * Key,integer sum output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>()
+ {
+ @Override
+ public Unifier<HashMap<K, Integer>> getUnifier()
+ {
+ return new UnifierHashMapInteger<K>();
+ }
+ };
- /**
- * Get cumulative flag.
- *
- * @return cumulative flag
- */
- public boolean isCumulative()
- {
- return cumulative;
- }
+ /**
+ * Get cumulative flag.
+ *
+ * @return cumulative flag
+ */
+ public boolean isCumulative()
+ {
+ return cumulative;
+ }
- /**
- * set cumulative flag.
- *
- * @param cumulative
- * input flag
- */
- public void setCumulative(boolean cumulative)
- {
- this.cumulative = cumulative;
- }
+ /**
+ * set cumulative flag.
+ *
+ * @param cumulative
+ * input flag
+ */
+ public void setCumulative(boolean cumulative)
+ {
+ this.cumulative = cumulative;
+ }
- /**
- * Emits on all ports that are connected. Data is precomputed during process
- * on input port endWindow just emits it for each key Clears the internal data
- * before return
- */
- @Override
- public void endWindow()
- {
+ /**
+ * Emits on all ports that are connected. Data is precomputed during process
+ * on input port endWindow just emits it for each key Clears the internal data
+ * before return
+ */
+ @Override
+ public void endWindow()
+ {
- // Should allow users to send each key as a separate tuple to load balance
- // This is an aggregate node, so load balancing would most likely not be
- // needed
+ // Should allow users to send each key as a separate tuple to load balance
+ // This is an aggregate node, so load balancing would most likely not be
+ // needed
- HashMap<K, V> tuples = new HashMap<K, V>();
- HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
- HashMap<K, Double> dtuples = new HashMap<K, Double>();
- HashMap<K, Integer> ituples = new HashMap<K, Integer>();
- HashMap<K, Float> ftuples = new HashMap<K, Float>();
- HashMap<K, Long> ltuples = new HashMap<K, Long>();
- HashMap<K, Short> stuples = new HashMap<K, Short>();
+ HashMap<K, V> tuples = new HashMap<K, V>();
+ HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
+ HashMap<K, Double> dtuples = new HashMap<K, Double>();
+ HashMap<K, Integer> ituples = new HashMap<K, Integer>();
+ HashMap<K, Float> ftuples = new HashMap<K, Float>();
+ HashMap<K, Long> ltuples = new HashMap<K, Long>();
+ HashMap<K, Short> stuples = new HashMap<K, Short>();
- for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
- K key = e.getKey();
- MutableDouble val = e.getValue();
- tuples.put(key, getValue(val.doubleValue()));
- dtuples.put(key, val.doubleValue());
- ituples.put(key, val.intValue());
- ftuples.put(key, val.floatValue());
- ltuples.put(key, val.longValue());
- stuples.put(key, val.shortValue());
- // ctuples.put(key, counts.get(e.getKey()).toInteger());
- MutableInt c = counts.get(e.getKey());
- if (c != null) {
- ctuples.put(key, c.toInteger());
- }
- }
+ for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
+ K key = e.getKey();
+ MutableDouble val = e.getValue();
+ tuples.put(key, getValue(val.doubleValue()));
+ dtuples.put(key, val.doubleValue());
+ ituples.put(key, val.intValue());
+ ftuples.put(key, val.floatValue());
+ ltuples.put(key, val.longValue());
+ stuples.put(key, val.shortValue());
+ // ctuples.put(key, counts.get(e.getKey()).toInteger());
+ MutableInt c = counts.get(e.getKey());
+ if (c != null) {
+ ctuples.put(key, c.toInteger());
+ }
+ }
- sum.emit(tuples);
- sumDouble.emit(dtuples);
- sumInteger.emit(ituples);
- sumLong.emit(ltuples);
- sumShort.emit(stuples);
- sumFloat.emit(ftuples);
- count.emit(ctuples);
- clearCache();
- }
+ sum.emit(tuples);
+ sumDouble.emit(dtuples);
+ sumInteger.emit(ituples);
+ sumLong.emit(ltuples);
+ sumShort.emit(stuples);
+ sumFloat.emit(ftuples);
+ count.emit(ctuples);
+ clearCache();
+ }
- /**
- * Clear sum maps.
- */
- private void clearCache()
- {
- if (!cumulative) {
- sums.clear();
- counts.clear();
- }
- }
+ /**
+ * Clear sum maps.
+ */
+ private void clearCache()
+ {
+ if (!cumulative) {
+ sums.clear();
+ counts.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
index d79a490..99e2492 100644
--- a/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/SumKeyVal.java
@@ -23,13 +23,12 @@ import java.util.Map;
import org.apache.commons.lang.mutable.MutableDouble;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
/**
* Emits the sum of values for each key at the end of window.
@@ -93,8 +92,7 @@ public class SumKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
SumEntry val = sums.get(key);
if (val == null) {
val = new SumEntry(new MutableDouble(tuple.getValue().doubleValue()), true);
- }
- else {
+ } else {
val.sum.add(tuple.getValue().doubleValue());
val.changed = true;
}
@@ -194,12 +192,11 @@ public class SumKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K
public void clearCache()
{
if (cumulative) {
- for (Map.Entry<K, SumEntry> e: sums.entrySet()) {
+ for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
SumEntry val = e.getValue();
val.changed = false;
}
- }
- else {
+ } else {
sums.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
index adc77fa..7f36ef5 100644
--- a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
+++ b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java
@@ -18,12 +18,13 @@
*/
package com.datatorrent.lib.math;
-import com.datatorrent.api.DefaultOutputPort;
-import org.xml.sax.InputSource;
-
import java.io.StringReader;
import java.util.List;
+import org.xml.sax.InputSource;
+
+import com.datatorrent.api.DefaultOutputPort;
+
/**
* An implementation of the AbstractXmlKeyValueCartesianProduct operator that takes in the xml document
* as a String input and outputs the cartesian product as Strings.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
index 231285f..7217086 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindow.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
import javax.validation.constraints.Min;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
/**
*
@@ -47,108 +47,108 @@ public abstract class AbstractSlidingWindow<T, S> extends BaseOperator
/**
* Input port for getting incoming data.
*/
- public final transient DefaultInputPort<T> data = new DefaultInputPort<T>()
- {
- @Override
- public void process(T tuple)
- {
- processDataTuple(tuple);
- }
- };
-
- protected ArrayList<S> states = null;
-
- protected S lastExpiredWindowState = null;
-
- protected int currentCursor = -1;
-
- @Min(2)
- int windowSize = 2;
-
- /**
- * getter function for n (number of previous window states
- *
- * @return n
- */
- @Min(2)
- public int getWindowSize()
- {
- return windowSize;
- }
-
- /**
- * setter for windowSize
- *
- * @param i
- */
- public void setWindowSize(int windowSize)
- {
- this.windowSize = windowSize;
- }
-
- abstract protected void processDataTuple(T tuple);
-
- /**
- * Implement this method to create the state object needs to be kept in the sliding window
- *
- * @return the state of current streaming window
- */
- public abstract S createWindowState();
-
- /**
- * Get the Streaming window state in it's coming the order start from 0
- *
- * @param i
- * 0 the state of the first coming streaming window
- * -1 the state of the last expired streaming window
- * @return State of the streaming window
- * @throws ArrayIndexOutOfBoundsException if i >= sliding window size
- */
- public S getStreamingWindowState(int i)
- {
- if(i == -1){
- return lastExpiredWindowState;
- }
- if (i >= getWindowSize()) {
- throw new ArrayIndexOutOfBoundsException();
- }
- int index = (currentCursor + 1 + i) % windowSize ;
- return states.get(index);
- }
-
- /**
- * Moves states by 1 and sets current state to null. If you override
- * beginWindow, you must call super.beginWindow(windowId) to ensure proper
- * operator behavior.
- *
- * @param windowId
- */
- @Override
- public void beginWindow(long windowId)
- {
- // move currentCursor 1 position
- currentCursor = (currentCursor + 1) % windowSize;
- // expire the state at the first position which is the state of the streaming window moving out of the current application window
- lastExpiredWindowState = states.get(currentCursor);
-
- states.set(currentCursor, createWindowState());
-
- }
-
- /**
- * Sets up internal state structure
- *
- * @param context
- */
- @Override
- public void setup(OperatorContext context)
- {
- super.setup(context);
- states = new ArrayList<S>(windowSize);
- //initialize the sliding window state to null
- for (int i = 0; i < windowSize; i++) {
- states.add(null);
- }
- currentCursor = -1;
- }
+ public final transient DefaultInputPort<T> data = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ processDataTuple(tuple);
+ }
+ };
+
+ protected ArrayList<S> states = null;
+
+ protected S lastExpiredWindowState = null;
+
+ protected int currentCursor = -1;
+
+ @Min(2)
+ int windowSize = 2;
+
+ /**
+ * getter function for n (number of previous window states
+ *
+ * @return n
+ */
+ @Min(2)
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
+
+ /**
+ * setter for windowSize
+ *
+ * @param windowSize
+ */
+ public void setWindowSize(int windowSize)
+ {
+ this.windowSize = windowSize;
+ }
+
+ protected abstract void processDataTuple(T tuple);
+
+ /**
+ * Implement this method to create the state object needs to be kept in the sliding window
+ *
+ * @return the state of current streaming window
+ */
+ public abstract S createWindowState();
+
+ /**
+ * Get the Streaming window state in it's coming the order start from 0
+ *
+ * @param i
+ * 0 the state of the first coming streaming window
+ * -1 the state of the last expired streaming window
+ * @return State of the streaming window
+ * @throws ArrayIndexOutOfBoundsException if i >= sliding window size
+ */
+ public S getStreamingWindowState(int i)
+ {
+ if (i == -1) {
+ return lastExpiredWindowState;
+ }
+ if (i >= getWindowSize()) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ int index = (currentCursor + 1 + i) % windowSize;
+ return states.get(index);
+ }
+
+ /**
+ * Moves states by 1 and sets current state to null. If you override
+ * beginWindow, you must call super.beginWindow(windowId) to ensure proper
+ * operator behavior.
+ *
+ * @param windowId
+ */
+ @Override
+ public void beginWindow(long windowId)
+ {
+ // move currentCursor 1 position
+ currentCursor = (currentCursor + 1) % windowSize;
+ // expire the state at the first position which is the state of the streaming window moving out of the current application window
+ lastExpiredWindowState = states.get(currentCursor);
+
+ states.set(currentCursor, createWindowState());
+
+ }
+
+ /**
+ * Sets up internal state structure
+ *
+ * @param context
+ */
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ states = new ArrayList<S>(windowSize);
+ //initialize the sliding window state to null
+ for (int i = 0; i < windowSize; i++) {
+ states.add(null);
+ }
+ currentCursor = -1;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
index 35e777b..69e8819 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/AbstractSlidingWindowKeyVal.java
@@ -52,104 +52,106 @@ import com.datatorrent.lib.util.KeyValPair;
* @since 0.3.3
*/
public abstract class AbstractSlidingWindowKeyVal<K, V extends Number, S extends SimpleMovingAverageObject>
- extends BaseNumberKeyValueOperator<K, V>
+ extends BaseNumberKeyValueOperator<K, V>
{
- /**
- * buffer to hold state information of different windows.
- */
- protected HashMap<K, ArrayList<S>> buffer = new HashMap<K, ArrayList<S>>();
- /**
- * Index of windows stating at 0.
- */
- protected int currentstate = -1;
+ /**
+ * buffer to hold state information of different windows.
+ */
+ protected HashMap<K, ArrayList<S>> buffer = new HashMap<K, ArrayList<S>>();
+ /**
+ * Index of windows stating at 0.
+ */
+ protected int currentstate = -1;
- /**
- * Concrete class has to implement how they want the tuple to be processed.
- *
- * @param tuple
- * a keyVal pair of tuple.
- */
- public abstract void processDataTuple(KeyValPair<K, V> tuple);
+ /**
+ * Concrete class has to implement how they want the tuple to be processed.
+ *
+ * @param tuple
+ * a keyVal pair of tuple.
+ */
+ public abstract void processDataTuple(KeyValPair<K, V> tuple);
- /**
- * Concrete class has to implement what to emit at the end of window.
- *
- * @param key
- * @param obj
- */
- public abstract void emitTuple(K key, ArrayList<S> obj);
+ /**
+ * Concrete class has to implement what to emit at the end of window.
+ *
+ * @param key
+ * @param obj
+ */
+ public abstract void emitTuple(K key, ArrayList<S> obj);
- /**
- * Length of sliding windows. Minimum value is 2.
- */
- @Min(2)
- protected int windowSize = 2;
- protected long windowId;
+ /**
+ * Length of sliding windows. Minimum value is 2.
+ */
+ @Min(2)
+ protected int windowSize = 2;
+ protected long windowId;
- /**
- * Getter function for windowSize (number of previous window buffer).
- *
- * @return windowSize
- */
- public int getWindowSize()
- {
- return windowSize;
- }
+ /**
+ * Getter function for windowSize (number of previous window buffer).
+ *
+ * @return windowSize
+ */
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
- /**
- * @param windowSize
- */
- public void setWindowSize(int windowSize)
- {
- this.windowSize = windowSize;
- }
+ /**
+ * @param windowSize
+ */
+ public void setWindowSize(int windowSize)
+ {
+ this.windowSize = windowSize;
+ }
- /**
- * Input port for getting incoming data.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
- {
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- processDataTuple(tuple);
- }
- };
+ /**
+ * Input port for getting incoming data.
+ */
+ public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
+ {
+ @Override
+ public void process(KeyValPair<K, V> tuple)
+ {
+ processDataTuple(tuple);
+ }
+ };
- /**
- * Moves buffer by 1 and clear contents of current. If you override
- * beginWindow, you must call super.beginWindow(windowId) to ensure proper
- * operator behavior.
- *
- * @param windowId
- */
- @Override
- public void beginWindow(long windowId)
- {
- this.windowId = windowId;
- currentstate++;
- if (currentstate >= windowSize) {
- for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
- ArrayList<S> states = e.getValue();
- S first = states.get(0);
- for (int i=1; i < windowSize; i++) states.set(i-1, states.get(i));
- states.set(windowSize-1, first);
- }
- currentstate = windowSize-1;
- }
- for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
- e.getValue().get(currentstate).clear();
- }
- }
+ /**
+ * Moves buffer by 1 and clear contents of current. If you override
+ * beginWindow, you must call super.beginWindow(windowId) to ensure proper
+ * operator behavior.
+ *
+ * @param windowId
+ */
+ @Override
+ public void beginWindow(long windowId)
+ {
+ this.windowId = windowId;
+ currentstate++;
+ if (currentstate >= windowSize) {
+ for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
+ ArrayList<S> states = e.getValue();
+ S first = states.get(0);
+ for (int i = 1; i < windowSize; i++) {
+ states.set(i - 1, states.get(i));
+ }
+ states.set(windowSize - 1, first);
+ }
+ currentstate = windowSize - 1;
+ }
+ for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
+ e.getValue().get(currentstate).clear();
+ }
+ }
- /**
- * Emit tuple for each key.
- */
- @Override
- public void endWindow()
- {
- for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
- emitTuple(e.getKey(), e.getValue());
- }
- }
+ /**
+ * Emit tuple for each key.
+ */
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
+ emitTuple(e.getKey(), e.getValue());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
index 09a9c24..ad4f8e5 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowRangeKeyVal.java
@@ -71,7 +71,7 @@ public class MultiWindowRangeKeyVal<K, V extends Number> extends RangeKeyVal<K,
* Clears the internal data before return
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
+ @Override
public void endWindow()
{
boolean emit = (++windowCount) % windowSize == 0;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
index d6463bd..7d5c377 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/MultiWindowSumKeyVal.java
@@ -18,13 +18,14 @@
*/
package com.datatorrent.lib.multiwindow;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.math.SumKeyVal;
import com.datatorrent.lib.util.KeyValPair;
-import java.util.Map;
-import javax.validation.constraints.Min;
-
/**
* A sum operator of KeyValPair schema which accumulates sum across multiple
* streaming windows.
@@ -53,45 +54,45 @@ import javax.validation.constraints.Min;
@OperatorAnnotation(partitionable = false)
public class MultiWindowSumKeyVal<K, V extends Number> extends SumKeyVal<K, V>
{
- /**
- * Number of streaming window after which tuple got emitted.
- */
- @Min(2)
- private int windowSize = 2;
- private long windowCount = 0;
+ /**
+ * Number of streaming window after which tuple got emitted.
+ */
+ @Min(2)
+ private int windowSize = 2;
+ private long windowCount = 0;
- public void setWindowSize(int windowSize)
- {
- this.windowSize = windowSize;
- }
+ public void setWindowSize(int windowSize)
+ {
+ this.windowSize = windowSize;
+ }
- /**
- * Emit only at the end of windowSize window boundary.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public void endWindow()
- {
- boolean emit = (++windowCount) % windowSize == 0;
+ /**
+ * Emit only at the end of windowSize window boundary.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void endWindow()
+ {
+ boolean emit = (++windowCount) % windowSize == 0;
- if (!emit) {
- return;
- }
+ if (!emit) {
+ return;
+ }
- // Emit only at the end of application window boundary.
- boolean dosum = sum.isConnected();
+ // Emit only at the end of application window boundary.
+ boolean dosum = sum.isConnected();
- if (dosum) {
- for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
- K key = e.getKey();
- if (dosum) {
- sum.emit(new KeyValPair(key, getValue(e.getValue().sum.doubleValue())));
- }
- }
- }
+ if (dosum) {
+ for (Map.Entry<K, SumEntry> e : sums.entrySet()) {
+ K key = e.getKey();
+ if (dosum) {
+ sum.emit(new KeyValPair(key, getValue(e.getValue().sum.doubleValue())));
+ }
+ }
+ }
- // Clear cumulative sum at the end of application window boundary.
- sums.clear();
- }
+ // Clear cumulative sum at the end of application window boundary.
+ sums.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java b/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
index c49c2b4..4d0a2c1 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/SimpleMovingAverage.java
@@ -48,85 +48,85 @@ import com.datatorrent.lib.util.KeyValPair;
*/
@OperatorAnnotation(partitionable = false)
public class SimpleMovingAverage<K, V extends Number> extends
- AbstractSlidingWindowKeyVal<K, V, SimpleMovingAverageObject>
+ AbstractSlidingWindowKeyVal<K, V, SimpleMovingAverageObject>
{
- /**
- * Output port to emit simple moving average (SMA) of last N window as Double.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSMA = new DefaultOutputPort<KeyValPair<K, Double>>();
- /**
- * Output port to emit simple moving average (SMA) of last N window as Float.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSMA = new DefaultOutputPort<KeyValPair<K, Float>>();
- /**
- * Output port to emit simple moving average (SMA) of last N window as Long.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Long>> longSMA = new DefaultOutputPort<KeyValPair<K, Long>>();
- /**
- * Output port to emit simple moving average (SMA) of last N window as
- * Integer.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSMA = new DefaultOutputPort<KeyValPair<K, Integer>>();
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as Double.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSMA = new DefaultOutputPort<KeyValPair<K, Double>>();
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as Float.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSMA = new DefaultOutputPort<KeyValPair<K, Float>>();
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as Long.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Long>> longSMA = new DefaultOutputPort<KeyValPair<K, Long>>();
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as
+ * Integer.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSMA = new DefaultOutputPort<KeyValPair<K, Integer>>();
- /**
- * Create the list if key doesn't exist. Add value to buffer and increment
- * counter.
- *
- * @param tuple
- */
- @Override
- public void processDataTuple(KeyValPair<K, V> tuple)
- {
- K key = tuple.getKey();
- double val = tuple.getValue().doubleValue();
- ArrayList<SimpleMovingAverageObject> dataList = buffer.get(key);
+ /**
+ * Create the list if key doesn't exist. Add value to buffer and increment
+ * counter.
+ *
+ * @param tuple
+ */
+ @Override
+ public void processDataTuple(KeyValPair<K, V> tuple)
+ {
+ K key = tuple.getKey();
+ double val = tuple.getValue().doubleValue();
+ ArrayList<SimpleMovingAverageObject> dataList = buffer.get(key);
- if (dataList == null) {
- dataList = new ArrayList<SimpleMovingAverageObject>(windowSize);
- for (int i = 0; i < windowSize; ++i) {
- dataList.add(new SimpleMovingAverageObject());
- }
- }
+ if (dataList == null) {
+ dataList = new ArrayList<SimpleMovingAverageObject>(windowSize);
+ for (int i = 0; i < windowSize; ++i) {
+ dataList.add(new SimpleMovingAverageObject());
+ }
+ }
- dataList.get(currentstate).add(val); // add to previous value
- buffer.put(key, dataList);
- }
+ dataList.get(currentstate).add(val); // add to previous value
+ buffer.put(key, dataList);
+ }
- /**
- * Calculate average and emit in appropriate port.
- *
- * @param key
- * @param obj
- */
- @Override
- public void emitTuple(K key, ArrayList<SimpleMovingAverageObject> obj)
- {
- double sum = 0;
- int count = 0;
- for (int i = 0; i < windowSize; i++) {
- SimpleMovingAverageObject d = obj.get(i);
- sum += d.getSum();
- count += d.getCount();
- }
+ /**
+ * Calculate average and emit in appropriate port.
+ *
+ * @param key
+ * @param obj
+ */
+ @Override
+ public void emitTuple(K key, ArrayList<SimpleMovingAverageObject> obj)
+ {
+ double sum = 0;
+ int count = 0;
+ for (int i = 0; i < windowSize; i++) {
+ SimpleMovingAverageObject d = obj.get(i);
+ sum += d.getSum();
+ count += d.getCount();
+ }
- if (count == 0) { // Nothing to emit.
- return;
- }
- if (doubleSMA.isConnected()) {
- doubleSMA.emit(new KeyValPair<K, Double>(key, (sum / count)));
- }
- if (floatSMA.isConnected()) {
- floatSMA.emit(new KeyValPair<K, Float>(key, (float) (sum / count)));
- }
- if (longSMA.isConnected()) {
- longSMA.emit(new KeyValPair<K, Long>(key, (long) (sum / count)));
- }
- if (integerSMA.isConnected()) {
- integerSMA.emit(new KeyValPair<K, Integer>(key, (int) (sum / count)));
- }
- }
+ if (count == 0) { // Nothing to emit.
+ return;
+ }
+ if (doubleSMA.isConnected()) {
+ doubleSMA.emit(new KeyValPair<K, Double>(key, (sum / count)));
+ }
+ if (floatSMA.isConnected()) {
+ floatSMA.emit(new KeyValPair<K, Float>(key, (float)(sum / count)));
+ }
+ if (longSMA.isConnected()) {
+ longSMA.emit(new KeyValPair<K, Long>(key, (long)(sum / count)));
+ }
+ if (integerSMA.isConnected()) {
+ integerSMA.emit(new KeyValPair<K, Integer>(key, (int)(sum / count)));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
index 8d3228b..892c822 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
@@ -28,10 +28,12 @@ import java.util.PriorityQueue;
import javax.validation.constraints.NotNull;
+import org.apache.commons.lang.ClassUtils;
+
+import com.google.common.base.Function;
+
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.google.common.base.Function;
-import org.apache.commons.lang.ClassUtils;
/**
*
@@ -102,7 +104,7 @@ public class SortedMovingWindow<T, K> extends AbstractSlidingWindow<T, List<T>>
{
super.endWindow();
tuplesInCurrentStreamWindow = new LinkedList<T>();
- if(lastExpiredWindowState == null){
+ if (lastExpiredWindowState == null) {
// not ready to emit value or empty in a certain window
return;
}
@@ -115,7 +117,7 @@ public class SortedMovingWindow<T, K> extends AbstractSlidingWindow<T, List<T>>
int k = 0;
if (comparator == null) {
if (expiredTuple instanceof Comparable) {
- k = ((Comparable<T>) expiredTuple).compareTo(minElemInSortedList);
+ k = ((Comparable<T>)expiredTuple).compareTo(minElemInSortedList);
} else {
errorOutput.emit(expiredTuple);
throw new IllegalArgumentException("Operator \"" + ClassUtils.getShortClassName(this.getClass()) + "\" encounters an invalid tuple " + expiredTuple + "\nNeither the tuple is comparable Nor Comparator is specified!");
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
index 6e2e01f..ed571b4 100644
--- a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
+++ b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
@@ -21,20 +21,24 @@ package com.datatorrent.lib.partitioner;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import javax.validation.constraints.Min;
-import com.google.common.collect.Sets;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
-
import com.datatorrent.common.partitioner.StatelessPartitioner;
/**
@@ -122,13 +126,11 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
response.repartitionRequired = true;
logger.debug("setting repartition to true");
- }
- else if (!repartition) {
+ } else if (!repartition) {
repartition = true;
nextMillis = System.currentTimeMillis() + cooldownMillis;
}
- }
- else {
+ } else {
repartition = false;
}
return response;
@@ -147,8 +149,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
nextMillis = partitionNextMillis;
// delegate to create initial list of partitions
return new StatelessPartitioner<T>(initialPartitionCount).definePartitions(partitions, context);
- }
- else {
+ } else {
// repartition call
logger.debug("repartition call for operator");
if (System.currentTimeMillis() < partitionNextMillis) {
@@ -169,8 +170,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
Partition<T> siblingPartition = lowLoadPartitions.remove(partitionKey & reducedMask);
if (siblingPartition == null) {
lowLoadPartitions.put(partitionKey & reducedMask, p);
- }
- else {
+ } else {
// both of the partitions are low load, combine
PartitionKeys newPks = new PartitionKeys(reducedMask, Sets.newHashSet(partitionKey & reducedMask));
// put new value so the map gets marked as modified
@@ -181,8 +181,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
//LOG.debug("partition keys after merge {}", siblingPartition.getPartitionKeys());
}
}
- }
- else if (load > 0) {
+ } else if (load > 0) {
// split bottlenecks
Map<Operator.InputPort<?>, PartitionKeys> keys = p.getPartitionKeys();
Map.Entry<Operator.InputPort<?>, PartitionKeys> e = keys.entrySet().iterator().next();
@@ -196,8 +195,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
int key = e.getValue().partitions.iterator().next();
int key2 = (newMask ^ e.getValue().mask) | key;
newKeys = Sets.newHashSet(key, key2);
- }
- else {
+ } else {
// assign keys to separate partitions
newMask = e.getValue().mask;
newKeys = e.getValue().partitions;
@@ -208,8 +206,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
newPartitions.add(newPartition);
}
- }
- else {
+ } else {
// leave unchanged
newPartitions.add(p);
}
@@ -229,8 +226,8 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
partitionedInstanceStatus.clear();
for (Map.Entry<Integer, Partition<T>> entry : partitions.entrySet()) {
if (partitionedInstanceStatus.containsKey(entry.getKey())) {
- }
- else {
+ //FIXME
+ } else {
partitionedInstanceStatus.put(entry.getKey(), null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java b/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
index 1c5c96a..4e607f1 100644
--- a/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/script/JavaScriptOperator.java
@@ -18,14 +18,22 @@
*/
package com.datatorrent.lib.script;
-import com.datatorrent.api.Context.OperatorContext;
-
import java.util.HashMap;
import java.util.Map;
-import javax.script.*;
+
+import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import javax.script.SimpleScriptContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context.OperatorContext;
+
/**
* An implementation of ScriptOperator that executes JavaScript on tuples input for Map <String, Object>.
*
@@ -46,7 +54,7 @@ import org.slf4j.LoggerFactory;
*
* // Connect to output console operator
* ConsoleOutputOperator console = dag.addOperator("console",
- * new ConsoleOutputOperator());
+ * new ConsoleOutputOperator());
* dag.addStream("rand_console", script.result, console.input);
*
* </pre>
@@ -54,17 +62,17 @@ import org.slf4j.LoggerFactory;
* <b> Sample Input Operator(emit)</b>
*
* <pre>
- * .
- * .
- * public void emitTuples() {
- * HashMap<String, Object> map = new HashMap<String, Object>();
- * map.put("val", random.nextInt());
- * outport.emit(map);
- * .
- * .
- * }
- * .
- * .
+ * .
+ * .
+ * public void emitTuples() {
+ * HashMap<String, Object> map = new HashMap<String, Object>();
+ * map.put("val", random.nextInt());
+ * outport.emit(map);
+ * .
+ * .
+ * }
+ * .
+ * .
* </pre>
*
* This operator does not checkpoint interpreted functions in the variable bindings because they are not serializable
@@ -80,9 +88,8 @@ public class JavaScriptOperator extends ScriptOperator
public enum Type
{
-
EVAL, INVOKE
- };
+ }
protected transient ScriptEngineManager sem = new ScriptEngineManager();
protected transient ScriptEngine engine = sem.getEngineByName("JavaScript");
@@ -108,6 +115,8 @@ public class JavaScriptOperator extends ScriptOperator
case INVOKE:
evalResult = ((Invocable)engine).invokeFunction(script);
break;
+ default:
+ //fallthru
}
if (isPassThru && result.isConnected()) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
index f10b04c..9532180 100644
--- a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java
@@ -18,16 +18,18 @@
*/
package com.datatorrent.lib.script;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+
import javax.validation.constraints.NotNull;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
/**
* A base implementation of a BaseOperator for language script operator. Subclasses should provide the
implementation of getting the bindings and process method.
@@ -96,5 +98,6 @@ public abstract class ScriptOperator extends BaseOperator
}
public abstract void process(Map<String, Object> tuple);
+
public abstract Map<String, Object> getBindings();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java b/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
index ebbe94a..95debb4 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/MedianOperator.java
@@ -21,10 +21,10 @@ package com.datatorrent.lib.statistics;
import java.util.ArrayList;
import java.util.Collections;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* An implementation of BaseOperator that computes median of incoming data. <br>
@@ -77,7 +77,9 @@ public class MedianOperator extends BaseOperator
@Override
public void endWindow()
{
- if (values.size() == 0) return;
+ if (values.size() == 0) {
+ return;
+ }
if (values.size() == 1) {
median.emit(values.get(0));
return;
@@ -86,9 +88,9 @@ public class MedianOperator extends BaseOperator
// median value
Collections.sort(values);
int medianIndex = values.size() / 2;
- if (values.size() %2 == 0) {
- Double value = values.get(medianIndex-1);
- value = (value + values.get(medianIndex))/2;
+ if (values.size() % 2 == 0) {
+ Double value = values.get(medianIndex - 1);
+ value = (value + values.get(medianIndex)) / 2;
median.emit(value);
} else {
median.emit(values.get(medianIndex));
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java b/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
index 84c9c46..f23f7b7 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/ModeOperator.java
@@ -21,10 +21,10 @@ package com.datatorrent.lib.statistics;
import java.util.HashMap;
import java.util.Map;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* An implementation of BaseOperator that calculates most frequent value occurring in incoming data. <br>
@@ -63,7 +63,7 @@ public class ModeOperator<V extends Comparable<?>> extends BaseOperator
{
if (values.containsKey(tuple)) {
Integer count = values.remove(tuple);
- values.put(tuple, count+1);
+ values.put(tuple, count + 1);
} else {
values.put(tuple, 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java b/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
index 0a47450..8aaf63e 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/StandardDeviation.java
@@ -20,11 +20,11 @@ package com.datatorrent.lib.statistics;
import java.util.ArrayList;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* An implementation of BaseOperator that computes variance and standard deviation over incoming data. <br>
@@ -67,7 +67,7 @@ public class StandardDeviation extends BaseOperator
/**
* Variance output port.
*/
- @OutputPortFieldAnnotation(optional=true)
+ @OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Number> variance = new DefaultOutputPort<Number>();
/**
@@ -82,19 +82,21 @@ public class StandardDeviation extends BaseOperator
public void endWindow()
{
// no values.
- if (values.size() == 0) return;
+ if (values.size() == 0) {
+ return;
+ }
// get mean first.
double mean = 0.0;
for (Double value : values) {
mean += value;
}
- mean = mean/values.size();
+ mean = mean / values.size();
// get variance
double outVal = 0.0;
for (Double value : values) {
- outVal += (value-mean)*(value-mean);
+ outVal += (value - mean) * (value - mean);
}
outVal = outVal / values.size();
if (variance.isConnected()) {