You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/11 04:30:06 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2240 Implement Windowed
Merge Operator for join support.
Repository: apex-malhar
Updated Branches:
refs/heads/master 96d216e80 -> aeb10f33d
APEXMALHAR-2240 Implement Windowed Merge Operator for join support.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/92bd7323
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/92bd7323
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/92bd7323
Branch: refs/heads/master
Commit: 92bd732325c9d8b8d6241aea398bb62aaaec7b91
Parents: 96d216e
Author: Shunxin <lu...@hotmail.com>
Authored: Mon Oct 10 18:47:59 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Mon Oct 10 19:07:24 2016 -0700
----------------------------------------------------------------------
.../malhar/lib/window/JoinAccumulation.java | 67 ------
.../malhar/lib/window/MergeAccumulation.java | 40 ++++
.../lib/window/MergeWindowedOperator.java | 30 +++
.../malhar/lib/window/accumulation/CoGroup.java | 45 ++++
.../lib/window/accumulation/InnerJoin.java | 115 ++++++++++
.../lib/window/accumulation/PojoInnerJoin.java | 187 +++++++++++++++++
.../impl/AbstractWindowedMergeOperator.java | 123 +++++++++++
.../window/impl/AbstractWindowedOperator.java | 87 ++++----
.../impl/KeyedWindowedMergeOperatorImpl.java | 120 +++++++++++
.../window/impl/KeyedWindowedOperatorImpl.java | 134 ++++++------
.../window/impl/WindowedMergeOperatorImpl.java | 112 ++++++++++
.../lib/window/accumulation/CoGroupTest.java | 60 ++++++
.../lib/window/accumulation/InnerJoinTest.java | 57 +++++
.../window/accumulation/PojoInnerJoinTest.java | 133 ++++++++++++
...yedWindowedMergeOperatorTestApplication.java | 208 +++++++++++++++++++
.../window/impl/WindowedMergeOperatorTest.java | 165 +++++++++++++++
.../WindowedMergeOperatorTestApplication.java | 203 ++++++++++++++++++
17 files changed, 1719 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
deleted file mode 100644
index 69240e0..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/JoinAccumulation.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This is the interface for accumulation when joining multiple streams.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public interface JoinAccumulation<InputT1, InputT2, InputT3, InputT4, InputT5, AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT>
-{
- /**
- * Accumulate the second input type to the accumulated value
- *
- * @param accumulatedValue
- * @param input
- * @return
- */
- AccumT accumulate2(AccumT accumulatedValue, InputT2 input);
-
- /**
- * Accumulate the third input type to the accumulated value
- *
- * @param accumulatedValue
- * @param input
- * @return
- */
- AccumT accumulate3(AccumT accumulatedValue, InputT3 input);
-
- /**
- * Accumulate the fourth input type to the accumulated value
- *
- * @param accumulatedValue
- * @param input
- * @return
- */
- AccumT accumulate4(AccumT accumulatedValue, InputT4 input);
-
- /**
- * Accumulate the fifth input type to the accumulated value
- *
- * @param accumulatedValue
- * @param input
- * @return
- */
- AccumT accumulate5(AccumT accumulatedValue, InputT5 input);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
new file mode 100644
index 0000000..71f4408
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the interface for accumulation when joining multiple streams.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public interface MergeAccumulation<InputT1, InputT2, AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT>
+{
+ /**
+ * Accumulate the second input type to the accumulated value
+ *
+ * @param accumulatedValue
+ * @param input
+ * @return
+ */
+ AccumT accumulate2(AccumT accumulatedValue, InputT2 input);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
new file mode 100644
index 0000000..89a70a4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+/**
+ * Interface for Join Windowed Operator.
+ */
+public interface MergeWindowedOperator<InputT1, InputT2>
+ extends WindowedOperator<InputT1>
+{
+ void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
+
+ void processWatermark2(ControlTuple.Watermark watermark);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java
new file mode 100644
index 0000000..e22d582
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CoGroup.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * CoGroup Join Accumulation.
+ */
+public class CoGroup<T> extends InnerJoin<T>
+{
+ public CoGroup()
+ {
+ //for kryo
+ }
+
+
+ @Override
+ public List<List<T>> getOutput(List<Set<T>> accumulatedValue)
+ {
+ List<List<T>> result = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ result.add(new ArrayList<T>(accumulatedValue.get(i)));
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java
new file mode 100644
index 0000000..fd250a2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoin.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+
+/**
+ * Combine Join Accumulation, inner-joins tuples with same type from different streams.
+ */
+public class InnerJoin<T> implements MergeAccumulation<T, T, List<Set<T>>, List<List<T>>>
+{
+
+ public InnerJoin()
+ {
+ //for kryo
+ }
+
+ @Override
+ public List<Set<T>> accumulate(List<Set<T>> accumulatedValue, T input)
+ {
+ return accumulateWithIndex(0, accumulatedValue, input);
+ }
+
+ @Override
+ public List<Set<T>> accumulate2(List<Set<T>> accumulatedValue, T input)
+ {
+ return accumulateWithIndex(1, accumulatedValue, input);
+ }
+
+
+ public List<Set<T>> accumulateWithIndex(int index, List<Set<T>> accumulatedValue, T input)
+ {
+ Set<T> accuSet = accumulatedValue.get(index);
+ accuSet.add(input);
+ accumulatedValue.set(index, accuSet);
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<Set<T>> defaultAccumulatedValue()
+ {
+ ArrayList<Set<T>> accu = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ accu.add(new HashSet<T>());
+ }
+ return accu;
+ }
+
+ @Override
+ public List<Set<T>> merge(List<Set<T>> accumulatedValue1, List<Set<T>> accumulatedValue2)
+ {
+ for (int i = 0; i < 2; i++) {
+ Set<T> accuSet1 = accumulatedValue1.get(i);
+ Set<T> accuSet2 = accumulatedValue2.get(i);
+ accuSet1.addAll(accuSet2);
+ accumulatedValue1.set(i, accuSet1);
+ }
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<List<T>> getOutput(List<Set<T>> accumulatedValue)
+ {
+ List<List<T>> result = new ArrayList<>();
+
+ // TODO: May need to revisit (use state manager).
+ result = getAllCombo(accumulatedValue, result, new ArrayList<T>());
+
+ return result;
+ }
+
+
+ public List<List<T>> getAllCombo(List<Set<T>> accu, List<List<T>> result, List<T> curList)
+ {
+ if (curList.size() == 2) {
+ result.add(curList);
+ return result;
+ } else {
+ for (T item : accu.get(curList.size())) {
+ List<T> tempList = new ArrayList<>(curList);
+ tempList.add(item);
+ result = getAllCombo(accu, result, tempList);
+ }
+ return result;
+ }
+ }
+
+
+ @Override
+ public List<List<T>> getRetraction(List<List<T>> value)
+ {
+ return new ArrayList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
new file mode 100644
index 0000000..4b3bc69
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.lang.reflect.Field;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Inner join Accumulation for Pojo Streams.
+ */
+public class PojoInnerJoin<InputT1, InputT2>
+ implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<Map<String, Object>>>
+{
+ protected String[] keys;
+
+ public PojoInnerJoin()
+ {
+ throw new IllegalArgumentException("Please specify number of streams that are joining.");
+ }
+
+ public PojoInnerJoin(int num, String... keys)
+ {
+ if (keys.length != 2) {
+ throw new IllegalArgumentException("Wrong number of keys.");
+ }
+
+ this.keys = Arrays.copyOf(keys, keys.length);
+ }
+
+ @Override
+ public List<List<Map<String, Object>>> accumulate(List<List<Map<String, Object>>> accumulatedValue, InputT1 input)
+ {
+ try {
+ return accumulateWithIndex(0, accumulatedValue, input);
+ } catch (NoSuchFieldException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public List<List<Map<String, Object>>> accumulate2(List<List<Map<String, Object>>> accumulatedValue, InputT2 input)
+ {
+ try {
+ return accumulateWithIndex(1, accumulatedValue, input);
+ } catch (NoSuchFieldException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+
+ @Override
+ public List<List<Map<String, Object>>> defaultAccumulatedValue()
+ {
+ List<List<Map<String, Object>>> accu = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ accu.add(new ArrayList<Map<String, Object>>());
+ }
+ return accu;
+ }
+
+
+ private List<List<Map<String, Object>>> accumulateWithIndex(int index, List<List<Map<String, Object>>> accu, Object input) throws NoSuchFieldException
+ {
+ // TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected.
+
+ input.getClass().getDeclaredField(keys[index]);
+
+ List<Map<String, Object>> curList = accu.get(index);
+ Map map = pojoToMap(input);
+ curList.add(map);
+ accu.set(index, curList);
+
+ return accu;
+ }
+
+ private Map<String, Object> pojoToMap(Object input)
+ {
+ Map<String, Object> map = new HashMap<>();
+
+ Field[] fields = input.getClass().getDeclaredFields();
+
+ for (Field field : fields) {
+ String[] words = field.getName().split("\\.");
+ String fieldName = words[words.length - 1];
+ field.setAccessible(true);
+ try {
+ Object value = field.get(input);
+ map.put(fieldName, value);
+ } catch (IllegalAccessException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ return map;
+ }
+
+ @Override
+ public List<List<Map<String, Object>>> merge(List<List<Map<String, Object>>> accumulatedValue1, List<List<Map<String, Object>>> accumulatedValue2)
+ {
+ for (int i = 0; i < 2; i++) {
+ List<Map<String, Object>> curList = accumulatedValue1.get(i);
+ curList.addAll(accumulatedValue2.get(i));
+ accumulatedValue1.set(i, curList);
+ }
+ return accumulatedValue1;
+ }
+
+ @Override
+ public List<Map<String, Object>> getOutput(List<List<Map<String, Object>>> accumulatedValue)
+ {
+ List<Map<String, Object>> result = new ArrayList<>();
+
+ // TODO: May need to revisit (use state manager).
+ result = getAllCombo(0, accumulatedValue, result, null);
+
+ return result;
+ }
+
+
+ private List<Map<String, Object>> getAllCombo(int streamIndex, List<List<Map<String, Object>>> accu, List<Map<String, Object>> result, Map<String, Object> curMap)
+ {
+ if (streamIndex == 2) {
+ if (curMap != null) {
+ result.add(curMap);
+ }
+ return result;
+ } else {
+ for (Map<String, Object> map : accu.get(streamIndex)) {
+ if (streamIndex == 0) {
+ Map<String, Object> tempMap = new HashMap<>(map);
+ result = getAllCombo(streamIndex + 1, accu, result, tempMap);
+ } else if (curMap == null) {
+ return result;
+ } else {
+ Map<String, Object> tempMap = new HashMap<>(curMap);
+ tempMap = joinTwoMapsWithKeys(tempMap, keys[0], map, keys[streamIndex]);
+ result = getAllCombo(streamIndex + 1, accu, result, tempMap);
+ }
+ }
+ return result;
+ }
+ }
+
+ private Map<String, Object> joinTwoMapsWithKeys(Map<String, Object> map1, String key1, Map<String, Object> map2, String key2)
+ {
+ if (!map1.get(key1).equals(map2.get(key2))) {
+ return null;
+ } else {
+ for (String field : map2.keySet()) {
+ if (!field.equals(key2)) {
+ map1.put(field, map2.get(field));
+ }
+ }
+ return map1;
+ }
+ }
+
+ @Override
+ public List<Map<String, Object>> getRetraction(List<Map<String, Object>> value)
+ {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
new file mode 100644
index 0000000..05a2495
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.google.common.base.Function;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+
+/**
+ * Abstract Windowed Merge Operator.
+ */
+public abstract class AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, DataStorageT extends WindowedStorage,
+ RetractionStorageT extends WindowedStorage, AccumulationT extends
+ MergeAccumulation>
+ extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT, RetractionStorageT, AccumulationT>
+ implements MergeWindowedOperator<InputT1, InputT2>
+{
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedMergeOperator.class);
+ private Function<InputT2, Long> timestampExtractor2;
+
+ private long latestWatermark1 = -1; // latest watermark from stream 1
+ private long latestWatermark2 = -1; // latest watermark from stream 2
+
+ public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>()
+ {
+ @Override
+ public void process(Tuple<InputT2> tuple)
+ {
+ processTuple2(tuple);
+ }
+ };
+
+ // TODO: This port should be removed when Apex Core has native support for custom control tuples
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>()
+ {
+ @Override
+ public void process(ControlTuple tuple)
+ {
+ if (tuple instanceof ControlTuple.Watermark) {
+ processWatermark2((ControlTuple.Watermark)tuple);
+ }
+ }
+ };
+
+ public void processTuple2(Tuple<InputT2> tuple)
+ {
+ long timestamp = extractTimestamp(tuple, timestampExtractor2);
+ if (isTooLate(timestamp)) {
+ dropTuple(tuple);
+ } else {
+ Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
+ // do the accumulation
+ accumulateTuple2(windowedTuple);
+ processWindowState(windowedTuple);
+ }
+ }
+
+ public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor2)
+ {
+ this.timestampExtractor2 = timestampExtractor2;
+ }
+
+
+ @Override
+ public void processWatermark(ControlTuple.Watermark watermark)
+ {
+ latestWatermark1 = watermark.getTimestamp();
+ if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
+ // Select the smallest timestamp of the latest watermarks as the watermark of the operator.
+ long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+ if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) {
+ this.watermarkTimestamp = minWatermark;
+ }
+ }
+ }
+
+ @Override
+ public void processWatermark2(ControlTuple.Watermark watermark)
+ {
+ latestWatermark2 = watermark.getTimestamp();
+ if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
+ long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+ if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) {
+ this.watermarkTimestamp = minWatermark;
+ }
+ }
+ }
+
+ @Override
+ protected void processWatermarkAtEndWindow()
+ {
+ if (fixedWatermarkMillis > 0 || this.watermarkTimestamp != this.currentWatermark) {
+ super.processWatermarkAtEndWindow();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index 0ece11e..f965a01 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -79,16 +79,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
private Function<InputT, Long> timestampExtractor;
- private long currentWatermark = -1;
- private long watermarkTimestamp = -1;
+ protected long currentWatermark = -1;
+ protected long watermarkTimestamp = -1;
private boolean triggerAtWatermark;
- private long earlyTriggerCount;
+ protected long earlyTriggerCount;
private long earlyTriggerMillis;
- private long lateTriggerCount;
+ protected long lateTriggerCount;
private long lateTriggerMillis;
private long currentDerivedTimestamp = -1;
private long timeIncrement;
- private long fixedWatermarkMillis = -1;
+ protected long fixedWatermarkMillis = -1;
private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();
@@ -96,7 +96,9 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
protected RetractionStorageT retractionStorage;
protected AccumulationT accumulation;
- private static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE);
+
+ protected static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE);
+
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class);
public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>()
@@ -135,28 +137,32 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
*/
public void processTuple(Tuple<InputT> tuple)
{
- long timestamp = extractTimestamp(tuple);
+ long timestamp = extractTimestamp(tuple, timestampExtractor);
if (isTooLate(timestamp)) {
dropTuple(tuple);
} else {
Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple);
// do the accumulation
accumulateTuple(windowedTuple);
+ processWindowState(windowedTuple);
+ }
+ }
- for (Window window : windowedTuple.getWindows()) {
- WindowState windowState = windowStateMap.get(window);
- windowState.tupleCount++;
- // process any count based triggers
- if (windowState.watermarkArrivalTime == -1) {
- // watermark has not arrived yet, check for early count based trigger
- if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount) == 0) {
- fireTrigger(window, windowState);
- }
- } else {
- // watermark has arrived, check for late count based trigger
- if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount) == 0) {
- fireTrigger(window, windowState);
- }
+ protected void processWindowState(Tuple.WindowedTuple<? extends Object> windowedTuple)
+ {
+ for (Window window : windowedTuple.getWindows()) {
+ WindowState windowState = windowStateMap.get(window);
+ windowState.tupleCount++;
+ // process any count based triggers
+ if (windowState.watermarkArrivalTime == -1) {
+ // watermark has not arrived yet, check for early count based trigger
+ if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount) == 0) {
+ fireTrigger(window, windowState);
+ }
+ } else {
+ // watermark has arrived, check for late count based trigger
+ if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount) == 0) {
+ fireTrigger(window, windowState);
}
}
}
@@ -292,15 +298,31 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
@Override
public Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input)
{
+ long timestamp = extractTimestamp(input, timestampExtractor);
+ return getWindowedValueWithTimestamp(input, timestamp);
+ }
+
+ public <T> Tuple.WindowedTuple<T> getWindowedValueWithTimestamp(Tuple<T> input, long timestamp)
+ {
if (windowOption == null && input instanceof Tuple.WindowedTuple) {
// inherit the windows from upstream
- return (Tuple.WindowedTuple<InputT>)input;
+ initializeWindowStates(((Tuple.WindowedTuple<T>)input).getWindows());
+ return (Tuple.WindowedTuple<T>)input;
} else {
- return new Tuple.WindowedTuple<>(assignWindows(input), extractTimestamp(input), input.getValue());
+ return new Tuple.WindowedTuple<>(assignWindows(input, timestamp), timestamp, input.getValue());
}
}
- private long extractTimestamp(Tuple<InputT> tuple)
+ protected void initializeWindowStates(Collection<? extends Window> windows)
+ {
+ for (Window window : windows) {
+ if (!windowStateMap.containsWindow(window)) {
+ windowStateMap.put(window, new WindowState());
+ }
+ }
+ }
+
+ protected <T> long extractTimestamp(Tuple<T> tuple, Function<T, Long> timestampExtractor)
{
if (timestampExtractor == null) {
if (tuple instanceof Tuple.TimestampedTuple) {
@@ -313,19 +335,14 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
- private Collection<? extends Window> assignWindows(Tuple<InputT> inputTuple)
+ protected <T> Collection<? extends Window> assignWindows(Tuple<T> inputTuple, long timestamp)
{
if (windowOption instanceof WindowOption.GlobalWindow) {
return GLOBAL_WINDOW_SINGLETON_SET;
} else {
- long timestamp = extractTimestamp(inputTuple);
if (windowOption instanceof WindowOption.TimeWindows) {
Collection<? extends Window> windows = getTimeWindowsForTimestamp(timestamp);
- for (Window window : windows) {
- if (!windowStateMap.containsWindow(window)) {
- windowStateMap.put(window, new WindowState());
- }
- }
+ initializeWindowStates(windows);
return windows;
} else if (windowOption instanceof WindowOption.SessionWindows) {
return assignSessionWindows(timestamp, inputTuple);
@@ -335,7 +352,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
- protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<InputT> inputTuple)
+ protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple)
{
throw new UnsupportedOperationException("Session window require keyed tuples");
}
@@ -348,7 +365,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
* @param timestamp the timestamp
* @return the windows this timestamp belongs to
*/
- private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
+ protected Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
{
List<Window.TimeWindow> windows = new ArrayList<>();
if (windowOption instanceof WindowOption.TimeWindows) {
@@ -382,7 +399,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
@Override
- public void dropTuple(Tuple<InputT> input)
+ public void dropTuple(Tuple input)
{
// do nothing
LOG.debug("Dropping late tuple {}", input);
@@ -464,7 +481,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
- private void processWatermarkAtEndWindow()
+ protected void processWatermarkAtEndWindow()
{
if (fixedWatermarkMillis > 0) {
watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
new file mode 100644
index 0000000..a5f17c5
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+
+/**
+ * Keyed Windowed Merge Operator to merge two streams of keyed tuple with a key. Please use
+ * {@link WindowedMergeOperatorImpl} for non-keyed merging.
+ *
+ * @param <KeyT> Type of the key used to merge two streams.
+ * @param <InputT1> The type of the value of the keyed input tuple from first stream.
+ * @param <InputT2> The type of the value of the keyed input tuple from second stream.
+ * @param <AccumT> The type of the accumulated value in the operator state per key per window.
+ * @param <OutputT> The type of the value of the keyed output tuple.
+ */
+public class KeyedWindowedMergeOperatorImpl<KeyT, InputT1, InputT2, AccumT, OutputT>
+ extends AbstractWindowedMergeOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, KeyValPair<KeyT, OutputT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>>
+{
+ // TODO: Add session window support.
+
+ private abstract class AccumFunction<T>
+ {
+ abstract AccumT accumulate(AccumT accum, T value);
+ }
+
+ private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn)
+ {
+ final KeyValPair<KeyT, T> kvData = tuple.getValue();
+ KeyT key = kvData.getKey();
+ for (Window window : tuple.getWindows()) {
+ // process each window
+ AccumT accum = dataStorage.get(window, key);
+ if (accum == null) {
+ accum = accumulation.defaultAccumulatedValue();
+ }
+ dataStorage.put(window, key, accumFn.accumulate(accum, kvData.getValue()));
+ }
+ }
+
+ @Override
+ public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT1 value)
+ {
+ return accumulation.accumulate(accum, value);
+ }
+ });
+ }
+
+ @Override
+ public void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT2 value)
+ {
+ return accumulation.accumulate2(accum, value);
+ }
+ });
+ }
+
+ @Override
+ public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+ {
+ for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
+ OutputT outputVal = accumulation.getOutput(entry.getValue());
+ if (fireOnlyUpdatedPanes) {
+ OutputT oldValue = retractionStorage.get(window, entry.getKey());
+ if (oldValue != null && oldValue.equals(outputVal)) {
+ continue;
+ }
+ }
+ output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
+ if (retractionStorage != null) {
+ retractionStorage.put(window, entry.getKey(), outputVal);
+ }
+ }
+ }
+
+ @Override
+ public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes)
+ {
+ if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+ throw new UnsupportedOperationException();
+ }
+ for (Map.Entry<KeyT, OutputT> entry : retractionStorage.entries(window)) {
+ output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index 6fab7de..a33133b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -52,81 +52,85 @@ public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
{
@Override
- protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<KeyValPair<KeyT, InputValT>> inputTuple)
+ protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple)
{
- KeyT key = inputTuple.getValue().getKey();
- WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption;
- SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage;
- Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis());
- Window.SessionWindow<KeyT> sessionWindowToAssign;
- switch (sessionEntries.size()) {
- case 0: {
- // There are no existing windows within the minimum gap. Create a new session window
- Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1);
- windowStateMap.put(sessionWindow, new WindowState());
- sessionWindowToAssign = sessionWindow;
- break;
- }
- case 1: {
- // There is already one existing window within the minimum gap. See whether we need to extend the time of that window
- Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next();
- Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey();
- if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
- // The session window already covers the event
+ if (!(inputTuple.getValue() instanceof KeyValPair)) {
+ throw new UnsupportedOperationException("Session window require keyed tuples");
+ } else {
+ KeyT key = ((KeyValPair<KeyT, ?>)inputTuple.getValue()).getKey();
+ WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption;
+ SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage;
+ Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis());
+ Window.SessionWindow<KeyT> sessionWindowToAssign;
+ switch (sessionEntries.size()) {
+ case 0: {
+ // There are no existing windows within the minimum gap. Create a new session window
+ Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1);
+ windowStateMap.put(sessionWindow, new WindowState());
sessionWindowToAssign = sessionWindow;
- } else {
- // The session window does not cover the event but is within the min gap
+ break;
+ }
+ case 1: {
+ // There is already one existing window within the minimum gap. See whether we need to extend the time of that window
+ Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next();
+ Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey();
+ if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
+ // The session window already covers the event
+ sessionWindowToAssign = sessionWindow;
+ } else {
+ // The session window does not cover the event but is within the min gap
+ if (triggerOption != null &&
+ triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+ // fire a retraction trigger because the session window will be enlarged
+ fireRetractionTrigger(sessionWindow, false);
+ }
+ // create a new session window that covers the timestamp
+ long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp);
+ long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + 1);
+ Window.SessionWindow<KeyT> newSessionWindow =
+ new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
+ windowStateMap.remove(sessionWindow);
+ sessionStorage.migrateWindow(sessionWindow, newSessionWindow);
+ windowStateMap.put(newSessionWindow, new WindowState());
+ sessionWindowToAssign = newSessionWindow;
+ }
+ break;
+ }
+ case 2: {
+ // There are two windows that fall within the minimum gap of the timestamp. We need to merge the two windows
+ Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator();
+ Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next();
+ Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next();
+ Window.SessionWindow<KeyT> sessionWindow1 = sessionWindowEntry1.getKey();
+ Window.SessionWindow<KeyT> sessionWindow2 = sessionWindowEntry2.getKey();
+ AccumT sessionData1 = sessionWindowEntry1.getValue();
+ AccumT sessionData2 = sessionWindowEntry2.getValue();
if (triggerOption != null &&
triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
- // fire a retraction trigger because the session window will be enlarged
- fireRetractionTrigger(sessionWindow, false);
+ // fire a retraction trigger because the two session windows will be merged to a new window
+ fireRetractionTrigger(sessionWindow1, false);
+ fireRetractionTrigger(sessionWindow2, false);
}
- // create a new session window that covers the timestamp
- long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp);
- long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + 1);
- Window.SessionWindow<KeyT> newSessionWindow =
- new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
- windowStateMap.remove(sessionWindow);
- sessionStorage.migrateWindow(sessionWindow, newSessionWindow);
+ long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp());
+ long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(),
+ sessionWindow2.getBeginTimestamp() + sessionWindow2.getDurationMillis());
+
+ Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
+ AccumT newSessionData = accumulation.merge(sessionData1, sessionData2);
+ sessionStorage.remove(sessionWindow1);
+ sessionStorage.remove(sessionWindow2);
+ sessionStorage.put(newSessionWindow, key, newSessionData);
+ windowStateMap.remove(sessionWindow1);
+ windowStateMap.remove(sessionWindow2);
windowStateMap.put(newSessionWindow, new WindowState());
sessionWindowToAssign = newSessionWindow;
+ break;
}
- break;
- }
- case 2: {
- // There are two windows that fall within the minimum gap of the timestamp. We need to merge the two windows
- Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator();
- Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next();
- Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next();
- Window.SessionWindow<KeyT> sessionWindow1 = sessionWindowEntry1.getKey();
- Window.SessionWindow<KeyT> sessionWindow2 = sessionWindowEntry2.getKey();
- AccumT sessionData1 = sessionWindowEntry1.getValue();
- AccumT sessionData2 = sessionWindowEntry2.getValue();
- if (triggerOption != null &&
- triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
- // fire a retraction trigger because the two session windows will be merged to a new window
- fireRetractionTrigger(sessionWindow1, false);
- fireRetractionTrigger(sessionWindow2, false);
- }
- long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp());
- long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(),
- sessionWindow2.getBeginTimestamp() + sessionWindow2.getDurationMillis());
-
- Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
- AccumT newSessionData = accumulation.merge(sessionData1, sessionData2);
- sessionStorage.remove(sessionWindow1);
- sessionStorage.remove(sessionWindow2);
- sessionStorage.put(newSessionWindow, key, newSessionData);
- windowStateMap.remove(sessionWindow1);
- windowStateMap.remove(sessionWindow2);
- windowStateMap.put(newSessionWindow, new WindowState());
- sessionWindowToAssign = newSessionWindow;
- break;
+ default:
+ throw new IllegalStateException("There are more than two sessions matching one timestamp");
}
- default:
- throw new IllegalStateException("There are more than two sessions matching one timestamp");
+ return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign);
}
- return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign);
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
new file mode 100644
index 0000000..38eeff0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+/**
+ * Windowed Merge Operator to merge two streams together. It aggregates tuple from two
+ * input streams, perform merge operation base on its merge accumulation, and output one
+ * result stream to downstream.
+ *
+ * @param <InputT1> The type of input tuple from first stream.
+ * @param <InputT2> The type of input tuple from first stream.
+ * @param <AccumT> The type of the accumulated value in the operator state per key per window.
+ * @param <OutputT> The type of output tuple.
+ */
+public class WindowedMergeOperatorImpl<InputT1, InputT2, AccumT, OutputT>
+ extends AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>>
+{
+ private abstract class AccumFunction<T>
+ {
+ abstract AccumT accumulate(AccumT accum, T value);
+ }
+
+ private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn)
+ {
+ for (Window window : tuple.getWindows()) {
+ // process each window
+ AccumT accum = dataStorage.get(window);
+ if (accum == null) {
+ accum = accumulation.defaultAccumulatedValue();
+ }
+ dataStorage.put(window, accumFn.accumulate(accum, tuple.getValue()));
+ }
+ }
+
+ @Override
+ public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT2 value)
+ {
+ return accumulation.accumulate2(accum, value);
+ }
+ });
+ }
+
+ @Override
+ public void accumulateTuple(Tuple.WindowedTuple<InputT1> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT1 value)
+ {
+ return accumulation.accumulate(accum, value);
+ }
+ });
+ }
+
+ @Override
+ public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+ {
+ AccumT accumulatedValue = dataStorage.get(window);
+ OutputT outputValue = accumulation.getOutput(accumulatedValue);
+
+ if (fireOnlyUpdatedPanes) {
+ OutputT oldValue = retractionStorage.get(window);
+ if (oldValue != null && oldValue.equals(outputValue)) {
+ return;
+ }
+ }
+ output.emit(new Tuple.WindowedTuple<>(window, outputValue));
+ if (retractionStorage != null) {
+ retractionStorage.put(window, outputValue);
+ }
+ }
+
+ @Override
+ public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes)
+ {
+ if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
+ throw new UnsupportedOperationException();
+ }
+ OutputT oldValue = retractionStorage.get(window);
+ if (oldValue != null) {
+ output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java
new file mode 100644
index 0000000..d988081
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CoGroupTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link InnerJoin}.
+ */
+public class CoGroupTest
+{
+
+ @Test
+ public void CoGroupTest()
+ {
+ CoGroup<Long> cg = new CoGroup<Long>();
+ List<Set<Long>> accu = cg.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+ for (int i = 0; i < 2; i++) {
+ Assert.assertEquals(0, accu.get(i).size());
+ }
+
+ for (long i = 1; i <= 3; i++) {
+ accu = cg.accumulate(accu, i);
+ accu = cg.accumulate2(accu, i * 2);
+ }
+
+ for (int i = 0; i < 2; i++) {
+ Assert.assertEquals(3, accu.get(i).size());
+ }
+
+ Assert.assertEquals(2, cg.getOutput(accu).size());
+ for (int i = 0; i < 2; i++) {
+ Assert.assertEquals(3, cg.getOutput(accu).get(i).size());
+ }
+ Assert.assertTrue(1 == cg.getOutput(accu).get(0).get(0));
+ Assert.assertTrue(4 == cg.getOutput(accu).get(1).get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java
new file mode 100644
index 0000000..1a379a4
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/InnerJoinTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link InnerJoin}.
+ */
+public class InnerJoinTest
+{
+ @Test
+ public void CombineTest()
+ {
+ InnerJoin<Long> cb = new InnerJoin<Long>();
+ List<Set<Long>> accu = cb.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+ for (int i = 0; i < 2; i++) {
+ Assert.assertEquals(0, accu.get(i).size());
+ }
+
+ for (long i = 1; i <= 3; i++) {
+ accu = cb.accumulate(accu, i);
+ accu = cb.accumulate2(accu, i * 2);
+ }
+
+ for (int i = 0; i < 2; i++) {
+ Assert.assertEquals(3, accu.get(i).size());
+ }
+
+ Assert.assertEquals(9, cb.getOutput(accu).size());
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(2, cb.getOutput(accu).get(i).size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
new file mode 100644
index 0000000..63690d1
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoinTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link PojoInnerJoin}.
+ */
+public class PojoInnerJoinTest
+{
+
+ public static class TestPojo1
+ {
+ private int uId;
+ private String uName;
+
+ public TestPojo1()
+ {
+
+ }
+
+ public TestPojo1(int id, String name)
+ {
+ this.uId = id;
+ this.uName = name;
+ }
+
+ public int getuId()
+ {
+ return uId;
+ }
+
+ public void setuId(int uId)
+ {
+ this.uId = uId;
+ }
+
+ public String getuName()
+ {
+ return uName;
+ }
+
+ public void setuName(String uName)
+ {
+ this.uName = uName;
+ }
+ }
+
+ public static class TestPojo2
+ {
+ private int uId;
+ private String dep;
+
+ public TestPojo2()
+ {
+
+ }
+
+ public TestPojo2(int id, String dep)
+ {
+ this.uId = id;
+ this.dep = dep;
+ }
+
+ public int getuId()
+ {
+ return uId;
+ }
+
+ public void setuId(int uId)
+ {
+ this.uId = uId;
+ }
+
+ public String getDep()
+ {
+ return dep;
+ }
+
+ public void setDep(String dep)
+ {
+ this.dep = dep;
+ }
+ }
+
+
+ @Test
+ public void PojoInnerJoinTest()
+ {
+ PojoInnerJoin<TestPojo1, TestPojo2> pij = new PojoInnerJoin<>(2, "uId", "uId");
+
+ List<List<Map<String, Object>>> accu = pij.defaultAccumulatedValue();
+
+ Assert.assertEquals(2, accu.size());
+
+ accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
+ accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
+
+ accu = pij.accumulate2(accu, new TestPojo2(1, "CS"));
+ accu = pij.accumulate2(accu, new TestPojo2(3, "ECE"));
+
+ Map<String, Object> result = new HashMap<>();
+ result.put("uId", 1);
+ result.put("uName", "Josh");
+ result.put("dep", "CS");
+
+ Assert.assertEquals(1, pij.getOutput(accu).size());
+ Assert.assertEquals(result, pij.getOutput(accu).get(0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java
new file mode 100644
index 0000000..15e3d4e
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorTestApplication.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.InnerJoin;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Example application using {@link KeyedWindowedMergeOperatorImpl}. Generators send streams of key-value pairs of
+ * <{@link String}, {@link Integer}>, Merge operator combines the two streams base on the key of the tuple.
+ */
+public class KeyedWindowedMergeOperatorTestApplication implements StreamingApplication
+{
+ private static WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();
+ private static final long windowDuration = 1000;
+ private static final String[] keys = new String[]{"A", "B", "C", "D", "E"};
+
+
+ public static Window.TimeWindow assignTestWindow(long timestamp)
+ {
+ long beginTimestamp = timestamp - timestamp % windowDuration;
+ Window.TimeWindow window = new Window.TimeWindow(beginTimestamp, windowDuration);
+ if (!windowStateMap.containsWindow(window)) {
+ windowStateMap.put(window, new WindowState());
+ }
+ return window;
+ }
+
+ public static class NumGen1 extends BaseOperator implements InputOperator
+ {
+ private int i;
+ private long watermarkTime;
+ private long startingTime;
+
+ public final transient DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, Integer>>> output = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ startingTime = System.currentTimeMillis();
+ watermarkTime = System.currentTimeMillis() + 10000;
+ i = 1;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (i <= 20) {
+ if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+ output.emit(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(assignTestWindow(System.currentTimeMillis()), new KeyValPair<String, Integer>(keys[i % 5], i)));
+ i++;
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (i <= 20) {
+ watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+ }
+ }
+ }
+
+ public static class NumGen2 extends BaseOperator implements InputOperator
+ {
+ private int i;
+ private long watermarkTime;
+ private long startingTime;
+
+ public final transient DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, Integer>>> output = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ startingTime = System.currentTimeMillis();
+ watermarkTime = System.currentTimeMillis() + 10000;
+ i = 1;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (i <= 20) {
+ if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+ output.emit(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(assignTestWindow(System.currentTimeMillis()), new KeyValPair<String, Integer>(keys[i % 5], 10 * i)));
+ i++;
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (i <= 20) {
+ watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ public static Map<String, List<List<Integer>>> result = new HashMap<>();
+
+ public final transient DefaultOutputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>> output = new DefaultOutputPort<>();
+
+ public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<KeyValPair<String, List<List<Integer>>>> tuple)
+ {
+ result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+ output.emit(tuple);
+ }
+ };
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op
+ = dag.addOperator("Merge", new KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>>());
+
+ //op.setAccumulation(new CoGroup<Integer>());
+ op.setAccumulation(new InnerJoin<Integer>());
+
+ op.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>());
+ op.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, List<List<Integer>>>());
+ op.setWindowStateStorage(windowStateMap);
+
+ // Can select one of the following window options, or don't select any of them.
+ op.setWindowOption(new WindowOption.GlobalWindow());
+ //op.setWindowOption(new WindowOption.TimeWindows(Duration.millis(4000)));
+
+ op.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingFiredPanes());
+ op.setAllowedLateness(Duration.millis(500));
+
+ NumGen1 numGen1 = dag.addOperator("numGen1", new NumGen1());
+ NumGen2 numGen2 = dag.addOperator("numGen2", new NumGen2());
+
+ Collector collector = dag.addOperator("collector", new Collector());
+ ConsoleOutputOperator con = dag.addOperator("console", new ConsoleOutputOperator());
+
+ dag.addStream("num1", numGen1.output, op.input);
+ dag.addStream("num2", numGen2.output, op.input2);
+ dag.addStream("wm1", numGen1.watermarkDefaultOutputPort, op.controlInput);
+ dag.addStream("wm2", numGen2.watermarkDefaultOutputPort, op.controlInput2);
+
+ dag.addStream("MergedResult", op.output, collector.input);
+ dag.addStream("output", collector.output, con.input);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new KeyedWindowedMergeOperatorTestApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(20000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
new file mode 100644
index 0000000..7dc09d0
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.accumulation.CoGroup;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Unit tests for Windowed Join Operator
+ */
+public class WindowedMergeOperatorTest
+{
+ @Test
+ public void extractTimestampTest()
+ {
+ WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator();
+ Function<Integer, Long> timestampExtractor = new Function<Integer, Long>()
+ {
+ @Override
+ public Long apply(Integer input)
+ {
+ return (input * 10L);
+ }
+ };
+
+ Assert.assertEquals(1000L, op.extractTimestamp(new Tuple.PlainTuple<Integer>(100), timestampExtractor));
+ Assert.assertEquals(2000L, op.extractTimestamp(new Tuple.PlainTuple<Integer>(200), timestampExtractor));
+ Assert.assertEquals(200L, op.extractTimestamp(new Tuple.TimestampedTuple<Integer>(200L, 10), null));
+ }
+
+
+ @Test
+ public void windowedMergeOperatorMergeTest()
+ {
+ WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op = createDefaultWindowedMergeOperator();
+ Window global = Window.GlobalWindow.INSTANCE;
+ op.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>());
+ op.setWindowOption(new WindowOption.GlobalWindow());
+ op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
+
+ op.processTuple(new Tuple.WindowedTuple<Integer>(global, 100));
+ Assert.assertEquals(1, op.dataStorage.get(global).get(0).size());
+ op.processTuple2(new Tuple.WindowedTuple<Integer>(global, 200));
+ Assert.assertEquals(1, op.dataStorage.get(global).get(1).size());
+ op.processTuple(new Tuple.WindowedTuple<Integer>(global, 300));
+ Assert.assertEquals(2, op.dataStorage.get(global).get(0).size());
+ Assert.assertEquals(2, op.accumulation.getOutput(op.dataStorage.get(global)).size());
+ }
+
+ @Test
+ public void keyedWindowedMergeOperatorMergeTest()
+ {
+ KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op
+ = createDefaultKeyedWindowedMergeOperator();
+ Window global = Window.GlobalWindow.INSTANCE;
+ op.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>());
+ op.setWindowOption(new WindowOption.GlobalWindow());
+ op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
+
+ op.processTuple(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("A", 100)));
+ Assert.assertEquals(1, op.dataStorage.get(global, "A").get(0).size());
+ Assert.assertTrue(op.dataStorage.get(global, "A").get(0).contains(100));
+
+ op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("A", 200)));
+ Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size());
+ Assert.assertTrue(op.dataStorage.get(global, "A").get(1).contains(200));
+
+ op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("B", 300)));
+ Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size());
+ Assert.assertEquals(1, op.dataStorage.get(global, "B").get(1).size());
+ Assert.assertTrue(op.dataStorage.get(global, "B").get(1).contains(300));
+
+ Assert.assertEquals(2, op.accumulation.getOutput(op.dataStorage.get(global, "A")).size());
+ }
+
+ @Test
+ public void windowedMergeOperatorWatermarkTest()
+ {
+ WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator();
+ CollectorTestSink<WatermarkImpl> sink = new CollectorTestSink<>();
+ op.controlOutput.setSink(sink);
+
+ // No watermark is generated if the Merge operator haven't seen all watermarks from all input streams.
+ op.controlInput.process(new WatermarkImpl(1000000));
+ op.endWindow();
+ Assert.assertEquals(-1, op.currentWatermark);
+ Assert.assertEquals(0, sink.collectedTuples.size());
+
+ // Once both input streams sent watermarks to Merge operator, it should generate a watermark and send to downstream.
+ op.controlInput2.process(new WatermarkImpl(200000));
+ op.endWindow();
+ Assert.assertEquals(200000, op.currentWatermark);
+ Assert.assertEquals(1, sink.collectedTuples.size());
+
+ // If the minimum of the latest input watermarks changes, Merge operator should also generate a new watermark.
+ op.controlInput2.process(new WatermarkImpl(2100000));
+ op.endWindow();
+ Assert.assertEquals(1000000, op.currentWatermark);
+ Assert.assertEquals(2, sink.collectedTuples.size());
+
+ // Current watermark of Merge operator could only change during endWindow() event.
+ op.controlInput.process(new WatermarkImpl(1100000));
+ Assert.assertEquals(1100000, op.currentWatermark);
+ op.endWindow();
+ Assert.assertEquals(3, sink.collectedTuples.size());
+
+ // If the upstreams sent a watermark but the minimum of the latest input watermarks doesn't change, the Merge
+ // operator should not generate a new watermark, thus nothing will be sent to downstream.
+ op.controlInput.process(new WatermarkImpl(1100000));
+ op.endWindow();
+ Assert.assertEquals(1100000, op.currentWatermark);
+ Assert.assertEquals(3, sink.collectedTuples.size());
+ }
+
+ private WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultWindowedMergeOperator()
+ {
+ WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> windowedMergeOperator = new WindowedMergeOperatorImpl<>();
+ windowedMergeOperator.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>());
+ windowedMergeOperator.setRetractionStorage(new InMemoryWindowedStorage<List<List<Integer>>>());
+ windowedMergeOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+ windowedMergeOperator.setAccumulation(new CoGroup<Integer>());
+ return windowedMergeOperator;
+ }
+
+ private KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultKeyedWindowedMergeOperator()
+ {
+ KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> windowedMergeOperator = new KeyedWindowedMergeOperatorImpl<>();
+ windowedMergeOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>());
+ windowedMergeOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, List<List<Integer>>>());
+ windowedMergeOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+ windowedMergeOperator.setAccumulation(new CoGroup<Integer>());
+ return windowedMergeOperator;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/92bd7323/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java
new file mode 100644
index 0000000..5d80388
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.CoGroup;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Example application to show usage of Windowed Merge Operator. Two generators send out two streams of integers,
+ * the Merge operator will do a co-group to Merge all income tuples and send output to collector and output to console.
+ * User can choose different window options and see how the application behaves.
+ */
+public class WindowedMergeOperatorTestApplication implements StreamingApplication
+{
+ private static WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();
+ private static final long windowDuration = 1000;
+
+
+ public static Window.TimeWindow assignTestWindow(long timestamp)
+ {
+ long beginTimestamp = timestamp - timestamp % windowDuration;
+ Window.TimeWindow window = new Window.TimeWindow(beginTimestamp, windowDuration);
+ if (!windowStateMap.containsWindow(window)) {
+ windowStateMap.put(window, new WindowState());
+ }
+ return window;
+ }
+
+ public static class NumGen1 extends BaseOperator implements InputOperator
+ {
+ private int i;
+ private long watermarkTime;
+ private long startingTime;
+
+ public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> output = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ startingTime = System.currentTimeMillis();
+ watermarkTime = System.currentTimeMillis() + 10000;
+ i = 1;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (i <= 20) {
+ if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+ output.emit(new Tuple.WindowedTuple<Integer>(assignTestWindow(System.currentTimeMillis()), i));
+ i++;
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (i <= 20) {
+ watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+ }
+ }
+ }
+
+ public static class NumGen2 extends BaseOperator implements InputOperator
+ {
+ private int i;
+ private long watermarkTime;
+ private long startingTime;
+
+ public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> output = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ startingTime = System.currentTimeMillis();
+ watermarkTime = System.currentTimeMillis() + 10000;
+ i = 1;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (i <= 20) {
+ if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+ output.emit(new Tuple.WindowedTuple<Integer>(assignTestWindow(System.currentTimeMillis()), 10 * i));
+ i++;
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (i <= 20) {
+ watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ public static List<List<List<Integer>>> result = new ArrayList<>();
+
+ public final transient DefaultOutputPort<Tuple<List<List<Integer>>>> output = new DefaultOutputPort<>();
+
+ public final transient DefaultInputPort<Tuple<List<List<Integer>>>> input = new DefaultInputPort<Tuple<List<List<Integer>>>>()
+ {
+ @Override
+ public void process(Tuple<List<List<Integer>>> tuple)
+ {
+ result.add(tuple.getValue());
+ output.emit(tuple);
+ }
+ };
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op
+ = dag.addOperator("Merge", new WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>>());
+ op.setAccumulation(new CoGroup<Integer>());
+ op.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>());
+ op.setRetractionStorage(new InMemoryWindowedStorage<List<List<Integer>>>());
+ op.setWindowStateStorage(windowStateMap);
+
+ // Can select one of the following window options, or don't select any of them.
+ //op.setWindowOption(new WindowOption.GlobalWindow());
+ op.setWindowOption(new WindowOption.TimeWindows(Duration.millis(2000)));
+
+ op.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingFiredPanes());
+ op.setAllowedLateness(Duration.millis(500));
+
+ NumGen1 numGen1 = dag.addOperator("numGen1", new NumGen1());
+ NumGen2 numGen2 = dag.addOperator("numGen2", new NumGen2());
+
+ Collector collector = dag.addOperator("collector", new Collector());
+ ConsoleOutputOperator con = dag.addOperator("console", new ConsoleOutputOperator());
+
+ dag.addStream("num1", numGen1.output, op.input);
+ dag.addStream("num2", numGen2.output, op.input2);
+ dag.addStream("wm1", numGen1.watermarkDefaultOutputPort, op.controlInput);
+ dag.addStream("wm2", numGen2.watermarkDefaultOutputPort, op.controlInput2);
+
+ dag.addStream("MergedResult", op.output, collector.input);
+ dag.addStream("output", collector.output, con.input);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new WindowedMergeOperatorTestApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(20000);
+ }
+}
[2/2] apex-malhar git commit: Introduced
WindowedMergeOperatorFeatures classes to solve the problem of code
duplication
Posted by th...@apache.org.
Introduced WindowedMergeOperatorFeatures classes to solve the problem of code duplication
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/aeb10f33
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/aeb10f33
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/aeb10f33
Branch: refs/heads/master
Commit: aeb10f33d54b6b661cb4f776a4cad0e41d5375c3
Parents: 92bd732
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 26 16:47:11 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Mon Oct 10 19:07:25 2016 -0700
----------------------------------------------------------------------
.../malhar/lib/window/MergeAccumulation.java | 2 +-
.../lib/window/MergeWindowedOperator.java | 12 +-
.../impl/AbstractWindowedMergeOperator.java | 123 ------------
.../window/impl/AbstractWindowedOperator.java | 34 +++-
.../impl/KeyedWindowedMergeOperatorImpl.java | 105 +++++------
.../impl/WindowedMergeOperatorFeatures.java | 185 +++++++++++++++++++
.../window/impl/WindowedMergeOperatorImpl.java | 106 ++++++-----
.../window/impl/WindowedMergeOperatorTest.java | 2 +-
8 files changed, 320 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
index 71f4408..53cfd40 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
@@ -21,7 +21,7 @@ package org.apache.apex.malhar.lib.window;
import org.apache.hadoop.classification.InterfaceStability;
/**
- * This is the interface for accumulation when joining multiple streams.
+ * This is the interface for accumulation when joining two streams.
*
* @since 3.5.0
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
index 89a70a4..1561caa 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
@@ -19,12 +19,22 @@
package org.apache.apex.malhar.lib.window;
/**
- * Interface for Join Windowed Operator.
+ * Interface for Merge Windowed Operator.
*/
public interface MergeWindowedOperator<InputT1, InputT2>
extends WindowedOperator<InputT1>
{
+ /**
+ * The method to accumulate the data tuple from the 2nd input stream
+ *
+ * @param tuple the data tuple
+ */
void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
+ /**
+ * The method to process the watermark tuple from the 2nd input stream
+ *
+ * @param watermark the watermark tuple
+ */
void processWatermark2(ControlTuple.Watermark watermark);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
deleted file mode 100644
index 05a2495..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.apex.malhar.lib.window.ControlTuple;
-import org.apache.apex.malhar.lib.window.MergeAccumulation;
-import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowedStorage;
-
-import com.google.common.base.Function;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
-
-/**
- * Abstract Windowed Merge Operator.
- */
-public abstract class AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, DataStorageT extends WindowedStorage,
- RetractionStorageT extends WindowedStorage, AccumulationT extends
- MergeAccumulation>
- extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT, RetractionStorageT, AccumulationT>
- implements MergeWindowedOperator<InputT1, InputT2>
-{
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedMergeOperator.class);
- private Function<InputT2, Long> timestampExtractor2;
-
- private long latestWatermark1 = -1; // latest watermark from stream 1
- private long latestWatermark2 = -1; // latest watermark from stream 2
-
- public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>()
- {
- @Override
- public void process(Tuple<InputT2> tuple)
- {
- processTuple2(tuple);
- }
- };
-
- // TODO: This port should be removed when Apex Core has native support for custom control tuples
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>()
- {
- @Override
- public void process(ControlTuple tuple)
- {
- if (tuple instanceof ControlTuple.Watermark) {
- processWatermark2((ControlTuple.Watermark)tuple);
- }
- }
- };
-
- public void processTuple2(Tuple<InputT2> tuple)
- {
- long timestamp = extractTimestamp(tuple, timestampExtractor2);
- if (isTooLate(timestamp)) {
- dropTuple(tuple);
- } else {
- Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
- // do the accumulation
- accumulateTuple2(windowedTuple);
- processWindowState(windowedTuple);
- }
- }
-
- public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor2)
- {
- this.timestampExtractor2 = timestampExtractor2;
- }
-
-
- @Override
- public void processWatermark(ControlTuple.Watermark watermark)
- {
- latestWatermark1 = watermark.getTimestamp();
- if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
- // Select the smallest timestamp of the latest watermarks as the watermark of the operator.
- long minWatermark = Math.min(latestWatermark1, latestWatermark2);
- if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) {
- this.watermarkTimestamp = minWatermark;
- }
- }
- }
-
- @Override
- public void processWatermark2(ControlTuple.Watermark watermark)
- {
- latestWatermark2 = watermark.getTimestamp();
- if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
- long minWatermark = Math.min(latestWatermark1, latestWatermark2);
- if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) {
- this.watermarkTimestamp = minWatermark;
- }
- }
- }
-
- @Override
- protected void processWatermarkAtEndWindow()
- {
- if (fixedWatermarkMillis > 0 || this.watermarkTimestamp != this.currentWatermark) {
- super.processWatermarkAtEndWindow();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index f965a01..e8ff622 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -79,8 +79,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
private Function<InputT, Long> timestampExtractor;
+ protected long nextWatermark = -1;
protected long currentWatermark = -1;
- protected long watermarkTimestamp = -1;
private boolean triggerAtWatermark;
protected long earlyTriggerCount;
private long earlyTriggerMillis;
@@ -141,7 +141,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
if (isTooLate(timestamp)) {
dropTuple(tuple);
} else {
- Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple);
+ Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
// do the accumulation
accumulateTuple(windowedTuple);
processWindowState(windowedTuple);
@@ -256,6 +256,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
this.timestampExtractor = timestampExtractor;
}
+ public void setNextWatermark(long timestamp)
+ {
+ this.nextWatermark = timestamp;
+ }
+
/**
* Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we
* don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally
@@ -408,7 +413,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
@Override
public void processWatermark(ControlTuple.Watermark watermark)
{
- this.watermarkTimestamp = watermark.getTimestamp();
+ this.nextWatermark = watermark.getTimestamp();
}
@Override
@@ -460,7 +465,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
} else {
currentDerivedTimestamp += timeIncrement;
}
- watermarkTimestamp = -1;
}
/**
@@ -484,18 +488,17 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
protected void processWatermarkAtEndWindow()
{
if (fixedWatermarkMillis > 0) {
- watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis;
+ nextWatermark = currentDerivedTimestamp - fixedWatermarkMillis;
}
- if (watermarkTimestamp > 0) {
- this.currentWatermark = watermarkTimestamp;
+ if (nextWatermark > 0 && currentWatermark < nextWatermark) {
- long horizon = watermarkTimestamp - allowedLatenessMillis;
+ long horizon = nextWatermark - allowedLatenessMillis;
for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.entries().iterator(); it.hasNext(); ) {
Map.Entry<Window, WindowState> entry = it.next();
Window window = entry.getKey();
WindowState windowState = entry.getValue();
- if (window.getBeginTimestamp() + window.getDurationMillis() < watermarkTimestamp) {
+ if (window.getBeginTimestamp() + window.getDurationMillis() < nextWatermark) {
// watermark has not arrived for this window before, marking this window late
if (windowState.watermarkArrivalTime == -1) {
windowState.watermarkArrivalTime = currentDerivedTimestamp;
@@ -514,7 +517,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
}
- controlOutput.emit(new WatermarkImpl(watermarkTimestamp));
+ controlOutput.emit(new WatermarkImpl(nextWatermark));
+ this.currentWatermark = nextWatermark;
}
}
@@ -552,6 +556,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
}
}
+ DataStorageT getDataStorage()
+ {
+ return dataStorage;
+ }
+
+ AccumulationT getAccumulation()
+ {
+ return accumulation;
+ }
+
/**
* This method fires the normal trigger for the given window.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
index a5f17c5..3714d6d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
@@ -18,14 +18,14 @@
*/
package org.apache.apex.malhar.lib.window.impl;
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.MergeAccumulation;
-import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowedStorage;
+import com.google.common.base.Function;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.lib.util.KeyValPair;
@@ -40,81 +40,68 @@ import com.datatorrent.lib.util.KeyValPair;
* @param <OutputT> The type of the value of the keyed output tuple.
*/
public class KeyedWindowedMergeOperatorImpl<KeyT, InputT1, InputT2, AccumT, OutputT>
- extends AbstractWindowedMergeOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, KeyValPair<KeyT, OutputT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>>
+ extends KeyedWindowedOperatorImpl<KeyT, InputT1, AccumT, OutputT>
+ implements MergeWindowedOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>>
{
- // TODO: Add session window support.
+ private Function<KeyValPair<KeyT, InputT2>, Long> timestampExtractor2;
+
+ private WindowedMergeOperatorFeatures.Keyed joinFeatures = new WindowedMergeOperatorFeatures.Keyed(this);
- private abstract class AccumFunction<T>
+ public final transient DefaultInputPort<Tuple<KeyValPair<KeyT, InputT2>>> input2 = new DefaultInputPort<Tuple<KeyValPair<KeyT, InputT2>>>()
{
- abstract AccumT accumulate(AccumT accum, T value);
- }
+ @Override
+ public void process(Tuple<KeyValPair<KeyT, InputT2>> tuple)
+ {
+ processTuple2(tuple);
+ }
+ };
- private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn)
+ // TODO: This port should be removed when Apex Core has native support for custom control tuples
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>()
{
- final KeyValPair<KeyT, T> kvData = tuple.getValue();
- KeyT key = kvData.getKey();
- for (Window window : tuple.getWindows()) {
- // process each window
- AccumT accum = dataStorage.get(window, key);
- if (accum == null) {
- accum = accumulation.defaultAccumulatedValue();
+ @Override
+ public void process(ControlTuple tuple)
+ {
+ if (tuple instanceof ControlTuple.Watermark) {
+ processWatermark2((ControlTuple.Watermark)tuple);
}
- dataStorage.put(window, key, accumFn.accumulate(accum, kvData.getValue()));
}
+ };
+
+ public void setTimestampExtractor2(Function<KeyValPair<KeyT, InputT2>, Long> timestampExtractor)
+ {
+ this.timestampExtractor2 = timestampExtractor;
}
- @Override
- public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple)
+ public void processTuple2(Tuple<KeyValPair<KeyT, InputT2>> tuple)
{
- accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
- {
- @Override
- AccumT accumulate(AccumT accum, InputT1 value)
- {
- return accumulation.accumulate(accum, value);
- }
- });
+ long timestamp = extractTimestamp(tuple, this.timestampExtractor2);
+ if (isTooLate(timestamp)) {
+ dropTuple(tuple);
+ } else {
+ Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
+ // do the accumulation
+ accumulateTuple2(windowedTuple);
+ processWindowState(windowedTuple);
+ }
}
@Override
public void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple)
{
- accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
- {
- @Override
- AccumT accumulate(AccumT accum, InputT2 value)
- {
- return accumulation.accumulate2(accum, value);
- }
- });
+ joinFeatures.accumulateTuple2(tuple);
}
@Override
- public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+ public void processWatermark(ControlTuple.Watermark watermark)
{
- for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
- OutputT outputVal = accumulation.getOutput(entry.getValue());
- if (fireOnlyUpdatedPanes) {
- OutputT oldValue = retractionStorage.get(window, entry.getKey());
- if (oldValue != null && oldValue.equals(outputVal)) {
- continue;
- }
- }
- output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
- if (retractionStorage != null) {
- retractionStorage.put(window, entry.getKey(), outputVal);
- }
- }
+ joinFeatures.processWatermark1(watermark);
}
@Override
- public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes)
+ public void processWatermark2(ControlTuple.Watermark watermark)
{
- if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
- throw new UnsupportedOperationException();
- }
- for (Map.Entry<KeyT, OutputT> entry : retractionStorage.entries(window)) {
- output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
- }
+ joinFeatures.processWatermark2(watermark);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java
new file mode 100644
index 0000000..3fceb06
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This class provides the features in a MergeWindowedOperator and is intended to be used only
+ * by the implementation of such operator
+ */
+abstract class WindowedMergeOperatorFeatures<InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation, DataStorageT extends WindowedStorage>
+{
+ protected AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator;
+
+ protected long latestWatermark1 = -1; // latest watermark from stream 1
+ protected long latestWatermark2 = -1; // latest watermark from stream 2
+
+ protected abstract class AccumFunction<T>
+ {
+ abstract AccumT accumulate(AccumT accum, T value);
+ }
+
+ protected WindowedMergeOperatorFeatures()
+ {
+ // for kryo
+ }
+
+ WindowedMergeOperatorFeatures(AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator)
+ {
+ this.operator = operator;
+ }
+
+ abstract void accumulateTuple1(Tuple.WindowedTuple<InputT1> tuple);
+
+ abstract void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
+
+ void processWatermark1(ControlTuple.Watermark watermark)
+ {
+ latestWatermark1 = watermark.getTimestamp();
+ // Select the smallest timestamp of the latest watermarks as the watermark of the operator.
+ long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+ operator.setNextWatermark(minWatermark);
+ }
+
+ void processWatermark2(ControlTuple.Watermark watermark)
+ {
+ latestWatermark2 = watermark.getTimestamp();
+ long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+ operator.setNextWatermark(minWatermark);
+ }
+
+ /**
+ * The merge features for plain (non-keyed) operator
+ */
+ static class Plain<InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation<InputT1, InputT2, AccumT, ?>, DataStorageT extends WindowedStorage.WindowedPlainStorage<AccumT>>
+ extends WindowedMergeOperatorFeatures<InputT1, InputT2, AccumT, AccumulationT, DataStorageT>
+ {
+ private Plain()
+ {
+ // for kryo
+ }
+
+ Plain(AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator)
+ {
+ super(operator);
+ }
+
+ private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn)
+ {
+ for (Window window : tuple.getWindows()) {
+ // process each window
+ AccumT accum = operator.getDataStorage().get(window);
+ if (accum == null) {
+ accum = operator.getAccumulation().defaultAccumulatedValue();
+ }
+ operator.getDataStorage().put(window, accumFn.accumulate(accum, tuple.getValue()));
+ }
+ }
+
+ @Override
+ void accumulateTuple1(Tuple.WindowedTuple<InputT1> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT1 value)
+ {
+ return operator.getAccumulation().accumulate(accum, value);
+ }
+ });
+ }
+
+ @Override
+ void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT2 value)
+ {
+ return operator.getAccumulation().accumulate2(accum, value);
+ }
+ });
+ }
+ }
+
+ /**
+ * The merge features for keyed operator
+ */
+ static class Keyed<KeyT, InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation<InputT1, InputT2, AccumT, ?>, DataStorageT extends WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>>
+ extends WindowedMergeOperatorFeatures<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, AccumT, AccumulationT, DataStorageT>
+ {
+ private Keyed()
+ {
+ // for kryo
+ }
+
+ Keyed(AbstractWindowedOperator<KeyValPair<KeyT, InputT1>, ?, DataStorageT, ?, AccumulationT> operator)
+ {
+ super(operator);
+ }
+
+ private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn)
+ {
+ final KeyValPair<KeyT, T> kvData = tuple.getValue();
+ KeyT key = kvData.getKey();
+ for (Window window : tuple.getWindows()) {
+ // process each window
+ AccumT accum = operator.getDataStorage().get(window, key);
+ if (accum == null) {
+ accum = operator.getAccumulation().defaultAccumulatedValue();
+ }
+ operator.getDataStorage().put(window, key, accumFn.accumulate(accum, kvData.getValue()));
+ }
+ }
+
+ @Override
+ void accumulateTuple1(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT1 value)
+ {
+ return operator.getAccumulation().accumulate(accum, value);
+ }
+ });
+ }
+
+ @Override
+ void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple)
+ {
+ accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+ {
+ @Override
+ AccumT accumulate(AccumT accum, InputT2 value)
+ {
+ return operator.getAccumulation().accumulate2(accum, value);
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
index 38eeff0..0f8a762 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
@@ -18,11 +18,14 @@
*/
package org.apache.apex.malhar.lib.window.impl;
-import org.apache.apex.malhar.lib.window.MergeAccumulation;
-import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
/**
* Windowed Merge Operator to merge two streams together. It aggregates tuple from two
@@ -35,78 +38,73 @@ import org.apache.apex.malhar.lib.window.WindowedStorage;
* @param <OutputT> The type of output tuple.
*/
public class WindowedMergeOperatorImpl<InputT1, InputT2, AccumT, OutputT>
- extends AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>>
+ extends WindowedOperatorImpl<InputT1, AccumT, OutputT> implements MergeWindowedOperator<InputT1, InputT2>
{
- private abstract class AccumFunction<T>
+ private Function<InputT2, Long> timestampExtractor2;
+
+ private WindowedMergeOperatorFeatures.Plain joinFeatures = new WindowedMergeOperatorFeatures.Plain(this);
+
+ public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>()
{
- abstract AccumT accumulate(AccumT accum, T value);
- }
+ @Override
+ public void process(Tuple<InputT2> tuple)
+ {
+ processTuple2(tuple);
+ }
+ };
- private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn)
+ // TODO: This port should be removed when Apex Core has native support for custom control tuples
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>()
{
- for (Window window : tuple.getWindows()) {
- // process each window
- AccumT accum = dataStorage.get(window);
- if (accum == null) {
- accum = accumulation.defaultAccumulatedValue();
+ @Override
+ public void process(ControlTuple tuple)
+ {
+ if (tuple instanceof ControlTuple.Watermark) {
+ processWatermark2((ControlTuple.Watermark)tuple);
}
- dataStorage.put(window, accumFn.accumulate(accum, tuple.getValue()));
}
+ };
+
+ public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor)
+ {
+ this.timestampExtractor2 = timestampExtractor;
}
- @Override
- public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
+ public void processTuple2(Tuple<InputT2> tuple)
{
- accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
- {
- @Override
- AccumT accumulate(AccumT accum, InputT2 value)
- {
- return accumulation.accumulate2(accum, value);
- }
- });
+ long timestamp = extractTimestamp(tuple, this.timestampExtractor2);
+ if (isTooLate(timestamp)) {
+ dropTuple(tuple);
+ } else {
+ Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
+ // do the accumulation
+ accumulateTuple2(windowedTuple);
+ processWindowState(windowedTuple);
+ }
}
@Override
public void accumulateTuple(Tuple.WindowedTuple<InputT1> tuple)
{
- accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
- {
- @Override
- AccumT accumulate(AccumT accum, InputT1 value)
- {
- return accumulation.accumulate(accum, value);
- }
- });
+ joinFeatures.accumulateTuple1(tuple);
}
@Override
- public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+ public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
{
- AccumT accumulatedValue = dataStorage.get(window);
- OutputT outputValue = accumulation.getOutput(accumulatedValue);
+ joinFeatures.accumulateTuple2(tuple);
+ }
- if (fireOnlyUpdatedPanes) {
- OutputT oldValue = retractionStorage.get(window);
- if (oldValue != null && oldValue.equals(outputValue)) {
- return;
- }
- }
- output.emit(new Tuple.WindowedTuple<>(window, outputValue));
- if (retractionStorage != null) {
- retractionStorage.put(window, outputValue);
- }
+ @Override
+ public void processWatermark(ControlTuple.Watermark watermark)
+ {
+ joinFeatures.processWatermark1(watermark);
}
@Override
- public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes)
+ public void processWatermark2(ControlTuple.Watermark watermark)
{
- if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
- throw new UnsupportedOperationException();
- }
- OutputT oldValue = retractionStorage.get(window);
- if (oldValue != null) {
- output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue)));
- }
+ joinFeatures.processWatermark2(watermark);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
index 7dc09d0..8c37d57 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
@@ -130,8 +130,8 @@ public class WindowedMergeOperatorTest
// Current watermark of Merge operator could only change during endWindow() event.
op.controlInput.process(new WatermarkImpl(1100000));
- Assert.assertEquals(1100000, op.currentWatermark);
op.endWindow();
+ Assert.assertEquals(1100000, op.currentWatermark);
Assert.assertEquals(3, sink.collectedTuples.size());
// If the upstreams sent a watermark but the minimum of the latest input watermarks doesn't change, the Merge