You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:49 UTC
[05/12] apex-malhar git commit: Updated algo & working on math
operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
deleted file mode 100644
index b0d2e77..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import javax.validation.constraints.Min;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Operator compares consecutive values arriving at input port mapped by keys, emits <key,percent change> pair on output alert port if percent change exceeds percentage threshold set in operator.
- * <p>
- * StateFull : Yes, current key/value is stored in operator for comparison in
- * next successive windows. <br>
- * Partition(s): No, base comparison value will be inconsistent across
- * instantiated copies. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects KeyValPair<K,V extends Number><br>
- * <b>alert</b>: emits KeyValPair<K,KeyValPair<V,Double>>(1)<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>threshold</b>: The threshold of change between consecutive tuples of the
- * same key that triggers an alert tuple<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * @displayName Change Alert Key Value
- * @category Rules and Alerts
- * @tags change, key value, numeric, percentage
- * @since 0.3.3
- */
-public class ChangeAlertKeyVal<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
-{
- /**
- * Base map is a StateFull field. It is retained across windows
- */
- private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
-
- /**
- * Input data port that takes a key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * Process each key, compute change or percent, and emit it.
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- K key = tuple.getKey();
- double tval = tuple.getValue().doubleValue();
- MutableDouble val = basemap.get(key);
- if (!doprocessKey(key)) {
- return;
- }
- if (val == null) { // Only process keys that are in the basemap
- val = new MutableDouble(tval);
- basemap.put(cloneKey(key), val);
- return;
- }
- double change = tval - val.doubleValue();
- double percent = (change / val.doubleValue()) * 100;
- if (percent < 0.0) {
- percent = 0.0 - percent;
- }
- if (percent > percentThreshold) {
- KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
- cloneValue(tuple.getValue()), percent);
- KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>(
- cloneKey(key), dmap);
- alert.emit(otuple);
- }
- val.setValue(tval);
- }
- };
-
- /**
- * Key,Percent Change output port.
- */
- public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
-
- /**
- * Alert thresh hold percentage set by application.
- */
- @Min(1)
- private double percentThreshold = 0.0;
-
- /**
- * getter function for threshold value
- *
- * @return threshold value
- */
- @Min(1)
- public double getPercentThreshold()
- {
- return percentThreshold;
- }
-
- /**
- * setter function for threshold value
- */
- public void setPercentThreshold(double d)
- {
- percentThreshold = d;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
deleted file mode 100644
index e212a2d..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.Min;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-
-/**
- * Operator stores <key,value> pair in hash map across the windows for comparison and emits hash map of <key,percent change in value for each key> if percent change
- * exceeds preset threshold.
- * <p>
- *
- * StateFull : Yes, key/value pair in current window are stored for comparison in next window. <br>
- * Partition : No, will yield wrong result, base value won't be consistent across instances. <br>
- *
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map<K,V extends Number><br>
- * <b>alert</b>: emits HashMap<K,HashMap<V,Double>>(1)<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>threshold</b>: The threshold of change between consecutive tuples of the same key that triggers an alert tuple<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * @displayName Change Alert Map
- * @category Rules and Alerts
- * @tags change, key value, numeric, percentage, map
- * @since 0.3.2
- */
-public class ChangeAlertMap<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
-{
- /**
- * Input data port that takes a map of <key,value>.
- */
- public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
- {
- /**
- * Process each key, compute change or percent, and emits it.
- */
- @Override
- public void process(Map<K, V> tuple)
- {
- for (Map.Entry<K, V> e: tuple.entrySet()) {
- MutableDouble val = basemap.get(e.getKey());
- if (!doprocessKey(e.getKey())) {
- continue;
- }
- if (val == null) { // Only process keys that are in the basemap
- val = new MutableDouble(e.getValue().doubleValue());
- basemap.put(cloneKey(e.getKey()), val);
- continue;
- }
- double change = e.getValue().doubleValue() - val.doubleValue();
- double percent = (change / val.doubleValue()) * 100;
- if (percent < 0.0) {
- percent = 0.0 - percent;
- }
- if (percent > percentThreshold) {
- HashMap<V,Double> dmap = new HashMap<V,Double>(1);
- dmap.put(cloneValue(e.getValue()), percent);
- HashMap<K,HashMap<V,Double>> otuple = new HashMap<K,HashMap<V,Double>>(1);
- otuple.put(cloneKey(e.getKey()), dmap);
- alert.emit(otuple);
- }
- val.setValue(e.getValue().doubleValue());
- }
- }
- };
-
- // Default "pass through" unifier works as tuple is emitted as pass through
- /**
- * Output port which emits a hashmap of key, percentage change.
- */
- public final transient DefaultOutputPort<HashMap<K, HashMap<V,Double>>> alert = new DefaultOutputPort<HashMap<K, HashMap<V,Double>>>();
-
- /**
- * basemap is a statefull field. It is retained across windows
- */
- private HashMap<K,MutableDouble> basemap = new HashMap<K,MutableDouble>();
- @Min(1)
- private double percentThreshold = 0.0;
-
- /**
- * getter function for threshold value
- * @return threshold value
- */
- @Min(1)
- public double getPercentThreshold()
- {
- return percentThreshold;
- }
-
- /**
- * setter function for threshold value
- */
- public void setPercentThreshold(double d)
- {
- percentThreshold = d;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
deleted file mode 100644
index 3f77052..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Operator compares <key,value> pairs arriving at data and base input ports and stores <key,value> pairs arriving at base port in hash map across the windows.
- * <p/>
- * The <key,value> pairs that arrive at data port are compared with base value if the key exists in the hash map.
- * Change value and percentage are emitted on separate ports.
- * StateFull : Yes, base map values are stored across windows. <br>
- * Partitions : Yes, values on the base port are replicated across all partitions. However the order of tuples on the
- * output stream may change.
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects KeyValPair<K,V extends Number><br>
- * <b>base</b>: expects KeyValPair<K,V extends Number><br>
- * <b>change</b>: emits KeyValPair<K,V>(1)<br>
- * <b>percent</b>: emits KeyValPair<K,Double>(1)<br>
- * <br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- *
- * @displayName Change Key Value
- * @category Math
- * @tags change, key value
- * @since 0.3.3
- */
-public class ChangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V>
-{
- /**
- * basemap is a stateful field. It is retained across windows
- */
- private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
-
- /**
- * Input data port that takes key value pairs.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * Process each key, compute change or percent, and emit it.
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- K key = tuple.getKey();
- if (!doprocessKey(key)) {
- return;
- }
- MutableDouble bval = basemap.get(key);
- if (bval != null) { // Only process keys that are in the basemap
- double cval = tuple.getValue().doubleValue() - bval.doubleValue();
- change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval)));
- percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100));
- }
- }
- };
-
- /**
- * Base value input port, stored in base map for comparison.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> base = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * Process each key to store the value. If same key appears again update
- * with latest value.
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- if (tuple.getValue().doubleValue() != 0.0) { // Avoid divide by zero, Emit
- // an error tuple?
- MutableDouble val = basemap.get(tuple.getKey());
- if (val == null) {
- val = new MutableDouble(0.0);
- basemap.put(cloneKey(tuple.getKey()), val);
- }
- val.setValue(tuple.getValue().doubleValue());
- }
- }
- };
-
- /**
- * Key, Change output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, V>> change = new DefaultOutputPort<KeyValPair<K, V>>();
-
- /**
- * Key, Percentage Change pair output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Double>> percent = new DefaultOutputPort<KeyValPair<K, Double>>();
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
deleted file mode 100644
index 66bd7da..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.algo.MatchMap;
-import com.datatorrent.lib.util.UnifierHashMap;
-
-/**
- * Operator compares based on the property "key", "value", and "compare".
- * <p>
- * The comparison is done by getting double value from the Number.
- * Passed tuples are emitted on the output port "compare".
- * Failed tuples are emitted on port "except".
- * Both output ports are optional, but at least one has to be connected.
- * This module is a pass through<br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map<K,V><br>
- * <b>compare</b>: emits HashMap<K,V><br>
- * <b>except</b>: emits HashMap<K,V><br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * Compile time checks<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
- * <b>Specific run time checks</b>:<br>
- * Does the incoming HashMap have the key<br>
- * Is the value of the key a number<br>
- * <p>
- * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
- * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareExceptMap<K,V extends Number> operator template">
- * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
- * <tr><td><b>5 Million K,V pairs/s</b></td><td>Each tuple is emitted if emitError is set to true</td><td>In-bound rate determines performance as every tuple is emitted.
- * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr>
- * </table><br>
- * <p>
- * <b>Function Table (K=String, V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>:
- * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareExceptMap<K,V extends Number> operator template">
- * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th colspan=2>Out-bound (emit)</th></tr>
- * <tr><th><i>data</i>(HashMap<K,V>)</th><th><i>compare</i>(HashMap<K,V>)</th><th><i>except</i>(HashMap<K,V>)</th></tr>
- * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr>
- * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td><td>{a=2,b=20,c=1000}</td></tr>
- * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td><td>{a=10,b=5}</td></tr>
- * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td><td>{d=55,b=12}</td></tr>
- * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td><td>{d=22,a=4}</td></tr>
- * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td><td></td></tr>
- * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td><td>N/A</td></tr>
- * </table>
- * <br>
- * <br>
- * @displayName Compare Except Map
- * @category Math
- * @tags comparison, key value, number, hash map
- * @since 0.3.2
- */
-@Stateless
-public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V>
-{
- /**
- * Output port that emits a hashmap of matched tuples after comparison.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
-
- /**
- * Output port that emits a hashmap of non matching tuples after comparison.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
- {
- @Override
- public Unifier<HashMap<K, V>> getUnifier()
- {
- return new UnifierHashMap<K, V>();
- }
- };
-
- /**
- * Emits if compare port is connected
- * @param tuple
- */
- @Override
- public void tupleMatched(Map<K, V> tuple)
- {
- if (compare.isConnected()) {
- compare.emit(cloneTuple(tuple));
- }
- }
-
- /**
- * Emits if except port is connected
- * @param tuple
- */
- @Override
- public void tupleNotMatched(Map<K, V> tuple)
- {
- if (except.isConnected()) {
- except.emit(cloneTuple(tuple));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
deleted file mode 100644
index 3636207..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.algo.MatchMap;
-
-/**
- * This operator compares tuples subclassed from Number based on the property "key", "value", and "cmp", and matching tuples are emitted.
- * <p>
- * If the tuple passed the test, it is emitted on the output port "compare". The comparison is done by getting double value from the Number.
- * Both output ports are optional, but at least one has to be connected.
- * This module is a pass through<br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map<K,V extends Number><br>
- * <b>compare</b>: emits HashMap<K,V><br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * CompareMap string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br>
- * <br>
- * <b>Specific run time checks</b>:<br>
- * Does the incoming HashMap have the key<br>
- * Is the value of the key a number<br>
- * <p>
- * <b>Benchmarks</b>: Blast as many tuples as possible in inline mode<br>
- * <table border="1" cellspacing=1 cellpadding=1 summary="Benchmark table for CompareMap<K,V extends Number> operator template">
- * <tr><th>In-Bound</th><th>Out-bound</th><th>Comments</th></tr>
- * <tr><td><b>8 Million K,V pairs/s</b></td><td>Each matched tuple is emitted</td><td>In-bound rate and number of tuples that match determine performance.
- * Immutable tuples were used in the benchmarking. If you use mutable tuples and have lots of keys, the benchmarks may be lower</td></tr>
- * </table><br>
- * <p>
- * <b>Function Table (K=String,V=Integer); emitError=true; key=a; value=3; cmp=eq)</b>:
- * <table border="1" cellspacing=1 cellpadding=1 summary="Function table for CompareMap<K,V extends Number> operator template">
- * <tr><th rowspan=2>Tuple Type (api)</th><th>In-bound (process)</th><th>Out-bound (emit)</th></tr>
- * <tr><th><i>data</i>(Map<K,V>)</th><th><i>compare</i>(HashMap<K,V>)</th></tr>
- * <tr><td>Begin Window (beginWindow())</td><td>N/A</td><td>N/A</td></tr>
- * <tr><td>Data (process())</td><td>{a=2,b=20,c=1000}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{a=3,b=40,c=2}</td><td>{a=3,b=40,c=2}</td></tr>
- * <tr><td>Data (process())</td><td>{a=10,b=5}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=55,b=12}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=22,a=4}</td><td></td></tr>
- * <tr><td>Data (process())</td><td>{d=4,a=3,g=5,h=44}</td><td>{d=4,a=3,g=5,h=44}</td></tr>
- * <tr><td>End Window (endWindow())</td><td>N/A</td><td>N/A</td></tr>
- * </table>
- * <br>
- * <br>
- * @displayName Compare Map
- * @category Math
- * @tags comparison, key value, numeric, map
- * @since 0.3.2
- */
-@Stateless
-public class CompareMap<K, V extends Number> extends MatchMap<K,V>
-{
- /**
- * Output port that emits a hashmap of matching number tuples after comparison.
- */
- public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
deleted file mode 100644
index d593020..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableInt;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.BaseKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.lib.util.UnifierCountOccurKey;
-
-/**
- * This Operator aggregates occurrence of keys in <key,value> pair at input port.<Key,Occurrence count> pair is emitted for each input on output port.
- * <p>
- * <br>
- * StateFull : Yes, key occurrence is aggregated over windows. <br>
- * Partitions : Yes, count occurrence unifier at output port. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects KeyValPair<K,V><br>
- * <b>count</b>: emits KeyValPair<K,Integer></b><br>
- * <br>
- * @displayName Count Key Value
- * @category Math
- * @tags count, key value, aggregate
- * @since 0.3.3
- */
-public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V>
-{
-
- /**
- * Key occurrence count map.
- */
- protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
-
- /**
- * Input data port that takes key value pair.
- */
- public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
- {
- /**
- * For each tuple (a key value pair): Adds the values for each key, Counts
- * the number of occurrence of each key
- */
- @Override
- public void process(KeyValPair<K, V> tuple)
- {
- K key = tuple.getKey();
- MutableInt count = counts.get(key);
- if (count == null) {
- count = new MutableInt(0);
- counts.put(cloneKey(key), count);
- }
- count.increment();
- }
-
- @Override
- public StreamCodec<KeyValPair<K, V>> getStreamCodec()
- {
- return getKeyValPairStreamCodec();
- }
- };
-
- /**
- * Key, occurrence value pair output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
- {
- @Override
- public UnifierCountOccurKey<K> getUnifier()
- {
- return new UnifierCountOccurKey<K>();
- }
- };
-
- /**
- * Emits on all ports that are connected. Data is computed during process on
- * input port and endWindow just emits it for each key. Clears the internal
- * data if resetAtEndWindow is true.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public void endWindow()
- {
- for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
- count.emit(new KeyValPair(e.getKey(),
- new Integer(e.getValue().intValue())));
- }
- counts.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
deleted file mode 100644
index ddef880..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.algo.MatchMap;
-import com.datatorrent.lib.util.UnifierHashMap;
-
-/**
- * This operator does comparison on tuple sub-classed from Number based on the property "key", "value", and "cmp", and not matched tuples are emitted.
- * <p>
- * The comparison is done by getting double value from the Number. Both output ports
- * are optional, but at least one has to be connected
- * <p>
- * This module is a pass through<br>
- * <br>
- * <br>
- * StateFull : No, output is emitted in current window. <br>
- * Partitions : Yes, No state dependency among input tuples. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map<K,V extends Number><br>
- * <b>except</b>: emits HashMap<K,V><br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq",
- * "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt",
- * "gte"<br>
- * <br>
- * <b>Run time checks</b>:<br>
- * Does the incoming HashMap have the key, Is the value of the key a number<br>
- * <br>
- * @displayName Except Map
- * @category Math
- * @tags comparison, Number
- * @since 0.3.3
- */
-@Stateless
-public class ExceptMap<K, V extends Number> extends MatchMap<K, V>
-{
- /**
- * Output port that emits non matching number tuples.
- */
- public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>()
- {
- @Override
- public Unifier<HashMap<K, V>> getUnifier()
- {
- return new UnifierHashMap<K, V>();
- }
- };
-
- /**
- * Does nothing. Overrides base as call super.tupleMatched() would emit the
- * tuple
- *
- * @param tuple
- */
- @Override
- public void tupleMatched(Map<K, V> tuple)
- {
- }
-
- /**
- * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override
- * in case objects are mutable
- *
- * @param tuple
- */
- @Override
- public void tupleNotMatched(Map<K, V> tuple)
- {
- except.emit(cloneTuple(tuple));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/Quotient.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Quotient.java b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
deleted file mode 100644
index ed08e86..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/Quotient.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.lib.util.BaseNumberValueOperator;
-
-/**
- * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window.
- * <p>
- * <br>
- * <b>StateFull : Yes </b>, Sum of values is taken over application window. <br>
- * <b>Partitions : No </b>, will yield wrong results, since values are
- * accumulated over application window. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>numerator</b>: expects V extends Number<br>
- * <b>denominator</b>: expects V extends Number<br>
- * <b>quotient</b>: emits Double<br>
- * <br>
- * <b>Properties : </b> <br>
- * <b>mult_by : </b>Multiply by value(default = 1). <br>
- * <br>
- * @displayName Quotient
- * @category Math
- * @tags division, sum, numeric
- * @since 0.3.3
- */
-@OperatorAnnotation(partitionable = false)
-public class Quotient<V extends Number> extends BaseNumberValueOperator<V>
-{
- protected double nval = 0.0;
- protected double dval = 0.0;
- int mult_by = 1;
-
- /**
- * Numerator values input port.
- */
- public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>()
- {
- /**
- * Adds to the numerator value
- */
- @Override
- public void process(V tuple)
- {
- nval += tuple.doubleValue();
- }
- };
-
- /**
- * Denominator values input port.
- */
- public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>()
- {
- /**
- * Adds to the denominator value
- */
- @Override
- public void process(V tuple)
- {
- dval += tuple.doubleValue();
- }
- };
-
- /**
- * Quotient output port.
- */
- public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>();
-
- public void setMult_by(int i)
- {
- mult_by = i;
- }
-
- /**
- * Generates tuple emits it as long as denominator is not 0. Clears internal
- * data
- */
- @Override
- public void endWindow()
- {
- if (dval == 0) {
- return;
- }
- double val = (nval / dval) * mult_by;
- quotient.emit(getValue(val));
- nval = 0.0;
- dval = 0.0;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java b/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
deleted file mode 100644
index a10fe95..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/QuotientMap.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.Min;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-
-/**
- * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator.
- * <p>
- * <br>
- * Application can set multiplication value for quotient(default = 1). <br>
- * Operator will calculate quotient of occurrence of key in numerator divided by
- * occurrence of key in denominator if countKey flag is true. <br>
- * Application can allow or block keys by setting filter key and inverse flag. <br>
- * <br>
- * <b>StateFull : Yes</b>, numerator/denominator values are summed over
- * application window. <br>
- * <b>Partitions : No, </b>, will yield wrong results, since values are summed
- * over app window. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>numerator</b>: expects Map<K,V extends Number><br>
- * <b>denominator</b>: expects Map<K,V extends Number><br>
- * <b>quotient</b>: emits HashMap<K,Double><br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>inverse :</b> if set to true the key in the filter will block tuple<br>
- * <b>filterBy :</b> List of keys to filter on<br>
- * <b>countkey :</b> Get quotient of occurrence of keys in numerator and
- * denominator. <br>
- * <b>mult_by :</b> Set multiply by constant value. <br>
- * <br>
- * @displayName Quotient Map
- * @category Math
- * @tags division, sum, map
- * @since 0.3.3
- */
-@OperatorAnnotation(partitionable = false)
-public class QuotientMap<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
-{
- /**
- * Numerator key/sum value map.
- */
- protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>();
-
- /**
- * Denominator key/sum value map.
- */
- protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>();
-
- /**
- * Count occurrence of keys if set to true.
- */
- boolean countkey = false;
-
- /**
- * Quotient multiply by value.
- */
- int mult_by = 1;
-
- /**
- * Numerator input port.
- */
- public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>()
- {
- /**
- * Added tuple to the numerator hash
- */
- @Override
- public void process(Map<K, V> tuple)
- {
- addTuple(tuple, numerators);
- }
- };
-
- /**
- * Denominator input port.
- */
- public final transient DefaultInputPort<Map<K, V>> denominator = new DefaultInputPort<Map<K, V>>()
- {
- /**
- * Added tuple to the denominator hash
- */
- @Override
- public void process(Map<K, V> tuple)
- {
- addTuple(tuple, denominators);
- }
- };
-
- /**
- * Quotient output port.
- */
- public final transient DefaultOutputPort<HashMap<K, Double>> quotient = new DefaultOutputPort<HashMap<K, Double>>();
-
- /**
- * Add tuple to nval/dval map.
- *
- * @param tuple
- * key/value map on input port.
- * @param map
- * key/summed value map.
- */
- public void addTuple(Map<K, V> tuple, Map<K, MutableDouble> map)
- {
- for (Map.Entry<K, V> e : tuple.entrySet()) {
- addEntry(e.getKey(), e.getValue(), map);
- }
- }
-
- /**
- * Add/Update entry to key/sum value map.
- *
- * @param key
- * name.
- * @param value
- * value for key.
- * @param map
- * numerator/denominator key/sum map.
- */
- public void addEntry(K key, V value, Map<K, MutableDouble> map)
- {
- if (!doprocessKey(key) || (value == null)) {
- return;
- }
- MutableDouble val = map.get(key);
- if (val == null) {
- if (countkey) {
- val = new MutableDouble(1.00);
- } else {
- val = new MutableDouble(value.doubleValue());
- }
- } else {
- if (countkey) {
- val.increment();
- } else {
- val.add(value.doubleValue());
- }
- }
- map.put(cloneKey(key), val);
- }
-
- /**
- * getter for mult_by
- *
- * @return mult_by
- */
-
- @Min(0)
- public int getMult_by()
- {
- return mult_by;
- }
-
- /**
- * getter for countkey
- *
- * @return countkey
- */
- public boolean getCountkey()
- {
- return countkey;
- }
-
- /**
- * Setter for mult_by
- *
- * @param i
- */
- public void setMult_by(int i)
- {
- mult_by = i;
- }
-
- /**
- * setter for countkey
- *
- * @param i
- * sets countkey
- */
- public void setCountkey(boolean i)
- {
- countkey = i;
- }
-
- /**
- * Generates tuples for each key and emits them. Only keys that are in the
- * denominator are iterated on If the key is only in the numerator, it gets
- * ignored (cannot do divide by 0) Clears internal data
- */
- @Override
- public void endWindow()
- {
- HashMap<K, Double> tuples = new HashMap<K, Double>();
- for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
- MutableDouble nval = numerators.get(e.getKey());
- if (nval == null) {
- tuples.put(e.getKey(), new Double(0.0));
- } else {
- tuples.put(e.getKey(), new Double((nval.doubleValue() / e.getValue()
- .doubleValue()) * mult_by));
- }
- }
- if (!tuples.isEmpty()) {
- quotient.emit(tuples);
- }
- numerators.clear();
- denominators.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java b/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
deleted file mode 100644
index c2d8465..0000000
--- a/library/src/main/java/com/datatorrent/lib/math/SumCountMap.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableInt;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.UnifierHashMapInteger;
-import com.datatorrent.lib.util.UnifierHashMapSumKeys;
-
-/**
- * Emits the sum and count of values for each key at the end of window.
- * <p>
- * Application accumulate sum across streaming window by setting cumulative flag
- * to true. <br>
- * This is an end of window operator<br>
- * <br>
- * <b>StateFull : Yes</b>, Sum is computed over application window and streaming
- * window. <br>
- * <b>Partitions : Yes</b>, Sum is unified at output port. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map<K,V extends Number><br>
- * <b>sum</b>: emits HashMap<K,V><br>
- * <b>count</b>: emits HashMap<K,Integer></b><br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>inverse</b>: if set to true the key in the filter will block tuple<br>
- * <b>filterBy</b>: List of keys to filter on<br>
- * <b>cumulative</b>: boolean flag, if set the sum is not cleared at the end of
- * window, <br>
- * hence generating cumulative sum across streaming windows. Default is false.<br>
- * <br>
- * @displayName Sum Count Map
- * @category Math
- * @tags number, sum, counting, map
- * @since 0.3.3
- */
-public class SumCountMap<K, V extends Number> extends
- BaseNumberKeyValueOperator<K, V>
-{
- /**
- * Key/double sum map.
- */
- protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
-
- /**
- * Key/integer sum map.
- */
- protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
-
- /**
- * Cumulative sum flag.
- */
- protected boolean cumulative = false;
-
- /**
- * Input port that takes a map. It adds the values for each key and counts the number of occurrences for each key.
- */
- public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
- {
- /**
- * For each tuple (a HashMap of keys,val pairs) Adds the values for each
- * key, Counts the number of occurrences of each key
- */
- @Override
- public void process(Map<K, V> tuple)
- {
- for (Map.Entry<K, V> e : tuple.entrySet()) {
- K key = e.getKey();
- if (!doprocessKey(key)) {
- continue;
- }
- if (sum.isConnected()) {
- MutableDouble val = sums.get(key);
- if (val == null) {
- val = new MutableDouble(e.getValue().doubleValue());
- } else {
- val.add(e.getValue().doubleValue());
- }
- sums.put(cloneKey(key), val);
- }
- if (SumCountMap.this.count.isConnected()) {
- MutableInt count = counts.get(key);
- if (count == null) {
- count = new MutableInt(0);
- counts.put(cloneKey(key), count);
- }
- count.increment();
- }
- }
- }
- };
-
- /**
- * Key,sum map output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, V>> sum = new DefaultOutputPort<HashMap<K, V>>()
- {
- @Override
- public Unifier<HashMap<K, V>> getUnifier()
- {
- return new UnifierHashMapSumKeys<K, V>();
- }
- };
-
- /**
- * Key,double sum map output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Double>> sumDouble = new DefaultOutputPort<HashMap<K, Double>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Double>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Double>();
- ret.setType(Double.class);
- return ret;
- }
- };
-
- /**
- * Key,integer sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Integer>> sumInteger = new DefaultOutputPort<HashMap<K, Integer>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Integer>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Integer>();
- ret.setType(Integer.class);
- return ret;
- }
- };
-
-
- /**
- * Key,long sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Long>> sumLong = new DefaultOutputPort<HashMap<K, Long>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Long>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Long>();
- ret.setType(Long.class);
- return ret;
- }
- };
-
- /**
- * Key,short sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Short>> sumShort = new DefaultOutputPort<HashMap<K, Short>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Short>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Short>();
- ret.setType(Short.class);
- return ret;
- }
- };
-
- /**
- * Key,float sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Float>> sumFloat = new DefaultOutputPort<HashMap<K, Float>>()
- {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Unifier<HashMap<K, Float>> getUnifier()
- {
- UnifierHashMapSumKeys ret = new UnifierHashMapSumKeys<K, Float>();
- ret.setType(Float.class);
- return ret;
- }
- };
-
- /**
- * Key,integer sum output port.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<K, Integer>> count = new DefaultOutputPort<HashMap<K, Integer>>()
- {
- @Override
- public Unifier<HashMap<K, Integer>> getUnifier()
- {
- return new UnifierHashMapInteger<K>();
- }
- };
-
- /**
- * Get cumulative flag.
- *
- * @return cumulative flag
- */
- public boolean isCumulative()
- {
- return cumulative;
- }
-
- /**
- * set cumulative flag.
- *
- * @param cumulative
- * input flag
- */
- public void setCumulative(boolean cumulative)
- {
- this.cumulative = cumulative;
- }
-
- /**
- * Emits on all ports that are connected. Data is precomputed during process
- * on input port endWindow just emits it for each key Clears the internal data
- * before return
- */
- @Override
- public void endWindow()
- {
-
- // Should allow users to send each key as a separate tuple to load balance
- // This is an aggregate node, so load balancing would most likely not be
- // needed
-
- HashMap<K, V> tuples = new HashMap<K, V>();
- HashMap<K, Integer> ctuples = new HashMap<K, Integer>();
- HashMap<K, Double> dtuples = new HashMap<K, Double>();
- HashMap<K, Integer> ituples = new HashMap<K, Integer>();
- HashMap<K, Float> ftuples = new HashMap<K, Float>();
- HashMap<K, Long> ltuples = new HashMap<K, Long>();
- HashMap<K, Short> stuples = new HashMap<K, Short>();
-
- for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
- K key = e.getKey();
- MutableDouble val = e.getValue();
- tuples.put(key, getValue(val.doubleValue()));
- dtuples.put(key, val.doubleValue());
- ituples.put(key, val.intValue());
- ftuples.put(key, val.floatValue());
- ltuples.put(key, val.longValue());
- stuples.put(key, val.shortValue());
- // ctuples.put(key, counts.get(e.getKey()).toInteger());
- MutableInt c = counts.get(e.getKey());
- if (c != null) {
- ctuples.put(key, c.toInteger());
- }
- }
-
- sum.emit(tuples);
- sumDouble.emit(dtuples);
- sumInteger.emit(ituples);
- sumLong.emit(ltuples);
- sumShort.emit(stuples);
- sumFloat.emit(ftuples);
- count.emit(ctuples);
- clearCache();
- }
-
- /**
- * Clear sum maps.
- */
- private void clearCache()
- {
- if (!cumulative) {
- sums.clear();
- counts.clear();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
deleted file mode 100644
index e3bba8a..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * A base implementation of a BaseOperator that is a sql stream operator. Subclasses should provide the
- implementation of how to process the tuples.
- * <p>
- * Abstract sql db input operator.
- * <p>
- * @displayName Abstract Sql Stream
- * @category Stream Manipulators
- * @tags sql operator
- * @since 0.3.2
- */
-public abstract class AbstractSqlStreamOperator extends BaseOperator
-{
- public static class InputSchema
- {
- public static class ColumnInfo
- {
- public String type;
- public int bindIndex = 0;
- public boolean isColumnIndex = false;
- }
-
- /**
- * the name of the input "table"
- */
- public String name;
- /**
- * key is the name of the column, and value is the SQL type
- */
- public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>();
-
- public InputSchema()
- {
- }
-
- public InputSchema(String name)
- {
- this.name = name;
- }
-
- public void setColumnInfo(String columnName, String columnType, boolean isColumnIndex)
- {
- ColumnInfo t = new ColumnInfo();
- t.type = columnType;
- t.isColumnIndex = isColumnIndex;
- columnInfoMap.put(columnName, t);
- }
-
- }
-
- protected String statement;
- protected ArrayList<InputSchema> inputSchemas = new ArrayList<InputSchema>(5);
- protected transient ArrayList<Object> bindings;
-
- /**
- * Input bindings port that takes an arraylist of objects.
- */
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<ArrayList<Object>> bindingsPort = new DefaultInputPort<ArrayList<Object>>()
- {
- @Override
- public void process(ArrayList<Object> tuple)
- {
- bindings = tuple;
- }
-
- };
-
- /**
- * Input port in1 that takes a hashmap of <string,object>.
- */
- public final transient DefaultInputPort<HashMap<String, Object>> in1 = new DefaultInputPort<HashMap<String, Object>>()
- {
- @Override
- public void process(HashMap<String, Object> tuple)
- {
- processTuple(0, tuple);
- }
-
- };
-
- /**
- * Input port in2 that takes a hashmap of <string,object>.
- */
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<HashMap<String, Object>> in2 = new DefaultInputPort<HashMap<String, Object>>()
- {
- @Override
- public void process(HashMap<String, Object> tuple)
- {
- processTuple(1, tuple);
- }
-
- };
-
- /**
- * Input port in3 that takes a hashmap of <string,object>.
- */
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<HashMap<String, Object>> in3 = new DefaultInputPort<HashMap<String, Object>>()
- {
- @Override
- public void process(HashMap<String, Object> tuple)
- {
- processTuple(2, tuple);
- }
-
- };
-
- /**
- * Input port in4 that takes a hashmap of <string,object>.
- */
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<HashMap<String, Object>> in4 = new DefaultInputPort<HashMap<String, Object>>()
- {
- @Override
- public void process(HashMap<String, Object> tuple)
- {
- processTuple(3, tuple);
- }
-
- };
-
- /**
- * Input port in5 that takes a hashmap of <string,object>.
- */
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<HashMap<String, Object>> in5 = new DefaultInputPort<HashMap<String, Object>>()
- {
- @Override
- public void process(HashMap<String, Object> tuple)
- {
- processTuple(4, tuple);
- }
-
- };
-
- /**
- * Output result port that emits a hashmap of <string,object>.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<HashMap<String, Object>> result = new DefaultOutputPort<HashMap<String, Object>>();
-
- public void setStatement(String statement)
- {
- this.statement = statement;
- }
-
- public String getStatement()
- {
- return this.statement;
- }
-
- public void setInputSchema(int inputPortIndex, InputSchema inputSchema)
- {
- inputSchemas.add(inputPortIndex, inputSchema);
- }
-
- public abstract void processTuple(int tableNum, HashMap<String, Object> tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
deleted file mode 100644
index 77c7522..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.streamquery.condition.Condition;
-
-/**
- * An implementation of BaseOperator that provides sql delete query semantic on live data stream. <br>
- * <p>
- * Stream rows passing condition are emitted on output port stream. <br>
- * <br>
- * <b>StateFull : NO,</b> all row data is processed in current time window. <br>
- * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects
- * HashMap<String,Object><<br>
- * <b> outport : </b> Output hash map(row) port, emits
- * HashMap<String,Object><br>
- * <br>
- * <b> Properties : <b> <br>
- * <b> condition : </b> Select condition for selecting rows. <br>
- * <b> columns : </b> Column names/aggregate functions for select. <br>
- * <br>
- * @displayName Delete
- * @category Stream Manipulators
- * @tags sql delete operator
- * @since 0.3.3
- */
-public class DeleteOperator extends BaseOperator
-{
-
- /**
- * condition.
- */
- private Condition condition = null;
-
- /**
- * set condition.
- */
- public void setCondition(Condition condition)
- {
- this.condition = condition;
- }
-
- /**
- * Input port that takes a map of <string,object>.
- */
- public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
- {
-
- @Override
- public void process(Map<String, Object> tuple)
- {
- if ((condition != null) && (!condition.isValidRow(tuple))) {
- outport.emit(tuple);
- }
- }
- };
-
- /**
- * Output port emits a map of <string,object>.
- */
- public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
deleted file mode 100644
index 2fe8bc3..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
-
-/**
- * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator.
- * <p>
- * @displayName Derby Sql Stream
- * @category Stream Manipulators
- * @tags sql, in-memory, input operator
- * @since 0.3.2
- */
-public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
-{
- protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5);
- protected List<String> execStmtStringList = new ArrayList<String>();
- protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5);
- protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5);
- protected transient Connection db;
-
- public void addExecStatementString(String stmt)
- {
- this.execStmtStringList.add(stmt);
- }
-
-
- @Override
- public void setup(OperatorContext context)
- {
- System.setProperty("derby.stream.error.file", "/dev/null");
- try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
-
- String connUrl = "jdbc:derby:memory:MALHAR_TEMP;create=true";
- PreparedStatement st;
-
- try {
- db = DriverManager.getConnection(connUrl);
- // create the temporary tables here
- for (int i = 0; i < inputSchemas.size(); i++) {
- InputSchema inputSchema = inputSchemas.get(i);
- if (inputSchema == null || inputSchema.columnInfoMap.isEmpty()) {
- continue;
- }
- String columnSpec = "";
- String columnNames = "";
- String insertQuestionMarks = "";
- int j = 0;
- for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) {
- if (!columnSpec.isEmpty()) {
- columnSpec += ",";
- columnNames += ",";
- insertQuestionMarks += ",";
- }
- columnSpec += entry.getKey();
- columnSpec += " ";
- columnSpec += entry.getValue().type;
- columnNames += entry.getKey();
- insertQuestionMarks += "?";
- entry.getValue().bindIndex = ++j;
- }
- String createTempTableStmt =
- "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED";
- st = db.prepareStatement(createTempTableStmt);
- st.execute();
- st.close();
-
- String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES ("
- + insertQuestionMarks + ")";
-
- insertStatements.add(i, db.prepareStatement(insertStmt));
- deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name));
- }
- for (String stmtStr : execStmtStringList) {
- execStatements.add(db.prepareStatement(stmtStr));
- }
- } catch (SQLException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- try {
- db.setAutoCommit(false);
- } catch (SQLException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void processTuple(int tableNum, HashMap<String, Object> tuple)
- {
- InputSchema inputSchema = inputSchemas.get(tableNum);
-
- PreparedStatement insertStatement = insertStatements.get(tableNum);
- try {
- for (Map.Entry<String, Object> entry : tuple.entrySet()) {
- ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey());
- if (t != null && t.bindIndex != 0) {
- insertStatement.setString(t.bindIndex, entry.getValue().toString());
- }
- }
-
- insertStatement.executeUpdate();
- insertStatement.clearParameters();
- } catch (SQLException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void endWindow()
- {
- try {
- db.commit();
- if (bindings != null) {
- for (int i = 0; i < bindings.size(); i++) {
- for (PreparedStatement stmt : execStatements) {
- stmt.setString(i, bindings.get(i).toString());
- }
- }
- }
-
- for (PreparedStatement stmt : execStatements) {
- executePreparedStatement(stmt);
- }
- for (PreparedStatement st : deleteStatements) {
- st.executeUpdate();
- st.clearParameters();
- }
- } catch (SQLException ex) {
- throw new RuntimeException(ex);
- }
- bindings = null;
- }
-
- private void executePreparedStatement(PreparedStatement statement) throws SQLException
- {
- ResultSet res = statement.executeQuery();
- ResultSetMetaData resmeta = res.getMetaData();
- int columnCount = resmeta.getColumnCount();
- while (res.next()) {
- HashMap<String, Object> resultRow = new HashMap<String, Object>();
- for (int i = 1; i <= columnCount; i++) {
- resultRow.put(resmeta.getColumnName(i), res.getObject(i));
- }
- this.result.emit(resultRow);
- }
- statement.clearParameters();
- }
-
- @Override
- public void teardown()
- {
- try {
- db.close();
- } catch (SQLException ex) {
- throw new RuntimeException(ex);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
deleted file mode 100644
index 1821953..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.HavingCondition;
-import com.datatorrent.lib.streamquery.function.FunctionIndex;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-
-/**
- * An implementation of BaseOperator that provides sql group by querying semantics on live data stream. <br>
- * <p>
- * Stream rows satisfying given select condition are processed by group by
- * column names and aggregate column function. <br>
- * If having condition is specified for aggregate index(s), it must also be
- * satisfied by row. HashMap of column name(s) and aggregate alias is emitted on
- * output port. <br>
- * <br>
- * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
- * <b>Partitions : No, </b> will yield wrong result(s). <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects
- * HashMap<String,Object><<br>
- * <b> outport : </b> Output hash map(row) port, emits
- * HashMap<String,Object><br>
- * <br>
- * <b> Properties : <b> <br>
- * <b> condition : </b> Select condition for deleting rows. <br>
- * <b> columnGroupIndexes : </b> Group by names list. <br>
- * <b> indexes : </b> Select column indexes. <br>
- * <b> havingConditions : </b> Having filter conditions for aggregate(s). <br>
- * <br>
- * @displayName GroupBy Having Operator
- * @category Stream Manipulators
- * @tags sql, groupby operator, condition, index
- * @since 0.3.4
- */
-@OperatorAnnotation(partitionable = false)
-public class GroupByHavingOperator extends BaseOperator
-{
-
- /**
- * aggregate indexes.
- */
- private ArrayList<FunctionIndex> aggregates = new ArrayList<FunctionIndex>();
-
- /**
- * Column, Group by names
- */
- private ArrayList<ColumnIndex> columnGroupIndexes = new ArrayList<ColumnIndex>();
-
- /**
- * where condition.
- */
- private Condition condition;
-
- /**
- * having aggregate condtion;
- */
- private ArrayList<HavingCondition> havingConditions = new ArrayList<HavingCondition>();
-
- /**
- * Table rows.
- */
- private ArrayList<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
-
- public void addAggregateIndex(@NotNull FunctionIndex index)
- {
- aggregates.add(index);
- }
-
- public void addColumnGroupByIndex(@NotNull ColumnIndex index)
- {
- columnGroupIndexes.add(index);
- }
-
- public void addHavingCondition(@NotNull HavingCondition condition)
- {
- havingConditions.add(condition);
- }
-
- /**
- * @param condition condition
- */
- public void setCondition(Condition condition)
- {
- this.condition = condition;
- }
-
- /**
- * Input port that takes a map of <string,object>.
- */
- public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
- {
-
- @Override
- public void process(Map<String, Object> tuple)
- {
- if ((condition != null) && (!condition.isValidRow(tuple))) {
- return;
- }
- rows.add(tuple);
- }
- };
-
- /**
- * Output port that emits a map of <string,object>.
- */
- public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>();
-
- /**
- * Create aggregate at end window.
- */
- @Override
- public void endWindow()
- {
- // group names
- if (columnGroupIndexes.size() == 0) {
- rows = new ArrayList<Map<String, Object>>();
- return;
- }
-
- // group rows
- HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>> groups = new HashMap<MultiKeyCompare, ArrayList<Map<String, Object>>>();
- for (Map<String, Object> row : rows) {
- MultiKeyCompare key = new MultiKeyCompare();
- for (ColumnIndex index : columnGroupIndexes) {
- key.addCompareKey(row.get(index.getColumn()));
- }
- ArrayList<Map<String, Object>> subRows;
- if (groups.containsKey(key)) {
- subRows = groups.get(key);
- } else {
- subRows = new ArrayList<Map<String, Object>>();
- groups.put(key, subRows);
- }
- subRows.add(row);
- }
-
- // Iterate over groups and emit aggregate values
- for (Map.Entry<MultiKeyCompare, ArrayList<Map<String, Object>>> entry : groups
- .entrySet()) {
- ArrayList<Map<String, Object>> subRows = entry.getValue();
-
- // get result
- Map<String, Object> result = new HashMap<String, Object>();
- for (ColumnIndex index : columnGroupIndexes) {
- index.filter(subRows.get(0), result);
- }
-
- // append aggregate values
- for (FunctionIndex aggregate : aggregates) {
- try {
- aggregate.filter(subRows, result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- // check valid having aggregate
- boolean isValidHaving = true;
- for (HavingCondition condition : havingConditions) {
- try {
- isValidHaving &= condition.isValidAggregate(subRows);
- } catch (Exception e) {
- e.printStackTrace();
- return;
- }
- }
- if (isValidHaving) {
- outport.emit(result);
- }
- }
-
- rows = new ArrayList<Map<String, Object>>();
- }
-
- /**
- * multi key compare class.
- */
- @SuppressWarnings("rawtypes")
- private class MultiKeyCompare implements Comparable
- {
-
- /**
- * compare keys.
- */
- ArrayList<Object> compareKeys = new ArrayList<Object>();
-
- @Override
- public boolean equals(Object other)
- {
- if (other instanceof MultiKeyCompare) {
- if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) {
- return false;
- }
- }
- for (int i = 0; i < compareKeys.size(); i++) {
- if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int hashCode = 0;
- for (int i = 0; i < compareKeys.size(); i++) {
- hashCode += compareKeys.get(i).hashCode();
- }
- return hashCode;
- }
-
- @Override
- public int compareTo(Object other)
- {
- if (this.equals(other)) {
- return 0;
- }
- return -1;
- }
-
- /**
- * Add compare key.
- */
- public void addCompareKey(Object value)
- {
- compareKeys.add(value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
deleted file mode 100644
index 883329e..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.index.Index;
-
-/**
- * An implementation of Operator that reads table row data from two table data input ports. <br>
- * <p>
- * Operator joins row on given condition and selected names, emits
- * joined result at output port.
- * <br>
- * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
- * <b>Partitions : No, </b> will yield wrong result(s). <br>
- * <br>
- * <b>Ports : </b> <br>
- * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br>
- * <b> inport1 : </b> Input port for table 2, expects HashMap<String, Object> <br>
- * <b> outport : </b> Output joined row port, emits HashMap<String, ArrayList<Object>> <br>
- * <br>
- * <b> Properties : </b>
- * <b> joinCondition : </b> Join condition for table rows. <br>
- * <b> table1Columns : </b> Columns to be selected from table1. <br>
- * <b> table2Columns : </b> Columns to be selected from table2. <br>
- * <br>
- * @displayName Inner join
- * @category Stream Manipulators
- * @tags sql, inner join operator
- *
- * @since 0.3.3
- */
-@OperatorAnnotation(partitionable = false)
-public class InnerJoinOperator implements Operator
-{
-
- /**
- * Join Condition;
- */
- protected Condition joinCondition;
-
- /**
- * Table1 select columns.
- */
- private ArrayList<Index> table1Columns = new ArrayList<Index>();
-
- /**
- * Table2 select columns.
- */
- private ArrayList<Index> table2Columns = new ArrayList<Index>();
-
- /**
- * Collect data rows from input port 1.
- */
- protected ArrayList<Map<String, Object>> table1;
-
- /**
- * Collect data from input port 2.
- */
- protected ArrayList<Map<String, Object>> table2;
-
- /**
- * Input port 1 that takes a map of <string,object>.
- */
- public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>()
- {
- @Override
- public void process(Map<String, Object> tuple)
- {
- table1.add(tuple);
- for (int j = 0; j < table2.size(); j++) {
- if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) {
- joinRows(tuple, table2.get(j));
- }
- }
- }
- };
-
- /**
- * Input port 2 that takes a map of <string,object>.
- */
- public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>()
- {
- @Override
- public void process(Map<String, Object> tuple)
- {
- table2.add(tuple);
- for (int j = 0; j < table1.size(); j++) {
- if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) {
- joinRows(table1.get(j), tuple);
- }
- }
- }
- };
-
- /**
- * Output port that emits a map of <string,object>.
- */
- public final transient DefaultOutputPort<Map<String, Object>> outport =
- new DefaultOutputPort<Map<String, Object>>();
-
- @Override
- public void setup(OperatorContext arg0)
- {
- table1 = new ArrayList<Map<String, Object>>();
- table2 = new ArrayList<Map<String, Object>>();
- }
-
- @Override
- public void teardown()
- {
- }
-
- @Override
- public void beginWindow(long arg0)
- {
- }
-
- @Override
- public void endWindow()
- {
- table1.clear();
- table2.clear();
- }
-
- /**
- * @return the joinCondition
- */
- public Condition getJoinCondition()
- {
- return joinCondition;
- }
-
- /**
- * Pick the supported condition. Currently only equal join is supported.
- * @param joinCondition joinCondition
- */
- public void setJoinCondition(Condition joinCondition)
- {
- this.joinCondition = joinCondition;
- }
-
- /**
- * Select table1 column name.
- */
- public void selectTable1Column(Index column)
- {
- table1Columns.add(column);
- }
-
- /**
- * Select table2 column name.
- */
- public void selectTable2Column(Index column)
- {
- table2Columns.add(column);
- }
-
- /**
- * Join row from table1 and table2.
- */
- protected void joinRows(Map<String, Object> row1, Map<String, Object> row2)
- {
- // joined row
- Map<String, Object> join = new HashMap<String, Object>();
-
- // filter table1 columns
- if (row1 != null) {
- for (Index index: table1Columns) {
- index.filter(row1, join);
- }
- }
-
- // filter table1 columns
- if (row2 != null) {
- for (Index index: table2Columns) {
- index.filter(row2, join);
- }
- }
-
- // emit row
- outport.emit(join);
- }
-
-}