You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:00 UTC
[04/12] flink git commit: Move window operators and tests to
windowing package
Move window operators and tests to windowing package
The api package is also called windowing, this harmonizes the package
names.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05d2138f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05d2138f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05d2138f
Branch: refs/heads/master
Commit: 05d2138f081ff5fa274dab571b9327f96be693aa
Parents: a606c4a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 24 16:33:09 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:04:16 2015 +0200
----------------------------------------------------------------------
.../api/datastream/KeyedWindowDataStream.java | 2 +-
...ractAlignedProcessingTimeWindowOperator.java | 223 +++++++
.../windowing/AbstractKeyedTimePanes.java | 76 +++
.../windowing/AccumulatingKeyedTimePanes.java | 126 ++++
...ccumulatingProcessingTimeWindowOperator.java | 48 ++
.../windowing/AggregatingKeyedTimePanes.java | 103 +++
...AggregatingProcessingTimeWindowOperator.java | 47 ++
.../runtime/operators/windowing/KeyMap.java | 651 +++++++++++++++++++
.../operators/windowing/PolicyToOperator.java | 82 +++
.../operators/windowing/package-info.java | 22 +
...ractAlignedProcessingTimeWindowOperator.java | 223 -------
.../windows/AbstractKeyedTimePanes.java | 76 ---
.../windows/AccumulatingKeyedTimePanes.java | 126 ----
...ccumulatingProcessingTimeWindowOperator.java | 48 --
.../windows/AggregatingKeyedTimePanes.java | 103 ---
...AggregatingProcessingTimeWindowOperator.java | 47 --
.../runtime/operators/windows/KeyMap.java | 651 -------------------
.../operators/windows/PolicyToOperator.java | 82 ---
.../runtime/operators/windows/package-info.java | 22 -
...AlignedProcessingTimeWindowOperatorTest.java | 547 ++++++++++++++++
...AlignedProcessingTimeWindowOperatorTest.java | 550 ++++++++++++++++
.../operators/windowing/CollectingOutput.java | 80 +++
.../windowing/KeyMapPutIfAbsentTest.java | 121 ++++
.../operators/windowing/KeyMapPutTest.java | 136 ++++
.../runtime/operators/windowing/KeyMapTest.java | 344 ++++++++++
...AlignedProcessingTimeWindowOperatorTest.java | 547 ----------------
...AlignedProcessingTimeWindowOperatorTest.java | 551 ----------------
.../operators/windows/CollectingOutput.java | 80 ---
.../windows/KeyMapPutIfAbsentTest.java | 121 ----
.../operators/windows/KeyMapPutTest.java | 136 ----
.../runtime/operators/windows/KeyMapTest.java | 344 ----------
31 files changed, 3157 insertions(+), 3158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index dfb7032..37151d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
-import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
/**
* A KeyedWindowDataStream represents a data stream where elements are grouped by key, and
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..6c4e53a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<IN, OUT>, Triggerable {
+
+ private static final long serialVersionUID = 3245500864882459867L;
+
+ private static final long MIN_SLIDE_TIME = 50;
+
+ // ----- fields for operator parametrization -----
+
+ private final Function function;
+ private final KeySelector<IN, KEY> keySelector;
+
+ private final long windowSize;
+ private final long windowSlide;
+ private final long paneSize;
+ private final int numPanesPerWindow;
+
+ // ----- fields for operator functionality -----
+
+ private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
+
+ private transient TimestampedCollector<OUT> out;
+
+ private transient long nextEvaluationTime;
+ private transient long nextSlideTime;
+
+ protected AbstractAlignedProcessingTimeWindowOperator(
+ Function function,
+ KeySelector<IN, KEY> keySelector,
+ long windowLength,
+ long windowSlide)
+ {
+ if (function == null || keySelector == null) {
+ throw new NullPointerException();
+ }
+ if (windowLength < MIN_SLIDE_TIME) {
+ throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
+ }
+ if (windowSlide < MIN_SLIDE_TIME) {
+ throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs");
+ }
+ if (windowLength < windowSlide) {
+ throw new IllegalArgumentException("The window size must be larger than the window slide");
+ }
+
+ final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide);
+ if (paneSlide < MIN_SLIDE_TIME) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot compute window of size %d msecs sliding by %d msecs. " +
+ "The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
+ }
+
+ this.function = function;
+ this.keySelector = keySelector;
+ this.windowSize = windowLength;
+ this.windowSlide = windowSlide;
+ this.paneSize = paneSlide;
+ this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide);
+ }
+
+
+ protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
+ KeySelector<IN, KEY> keySelector, Function function);
+
+ // ------------------------------------------------------------------------
+ // startup and shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ out = new TimestampedCollector<>(output);
+
+ // create the panes that gather the elements per slide
+ panes = createPanes(keySelector, function);
+
+ // decide when to first compute the window and when to slide it
+ // the values should align with the start of time (that is, the UNIX epoch, not the big bang)
+ final long now = System.currentTimeMillis();
+ nextEvaluationTime = now + windowSlide - (now % windowSlide);
+ nextSlideTime = now + paneSize - (now % paneSize);
+
+ getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
+ }
+
+ @Override
+ public void close() throws Exception {
+ final long finalWindowTimestamp = nextEvaluationTime;
+
+ // early stop the triggering thread, so it does not attempt to return any more data
+ stopTriggers();
+
+ // emit the remaining data
+ computeWindow(finalWindowTimestamp);
+ }
+
+ @Override
+ public void dispose() {
+ // acquire the lock during shutdown, to prevent trigger calls at the same time
+ // fail-safe stop of the triggering thread (in case of an error)
+ stopTriggers();
+
+ // release the panes
+ panes.dispose();
+ }
+
+ private void stopTriggers() {
+ // reset the action timestamps. this makes sure any pending triggers will not evaluate
+ nextEvaluationTime = -1L;
+ nextSlideTime = -1L;
+ }
+
+ // ------------------------------------------------------------------------
+ // Receiving elements and triggers
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ panes.addElementToLatestPane(element.getValue());
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) {
+ // this operator does not react to watermarks
+ }
+
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ // first we check if we actually trigger the window function
+ if (timestamp == nextEvaluationTime) {
+ // compute and output the results
+ computeWindow(timestamp);
+
+ nextEvaluationTime += windowSlide;
+ }
+
+ // check if we slide the panes by one. this may happen in addition to the
+ // window computation, or just by itself
+ if (timestamp == nextSlideTime) {
+ panes.slidePanes(numPanesPerWindow);
+ nextSlideTime += paneSize;
+ }
+
+ long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
+ getRuntimeContext().registerTimer(nextTriggerTime, this);
+ }
+
+ private void computeWindow(long timestamp) throws Exception {
+ out.setTimestamp(timestamp);
+ panes.truncatePanes(numPanesPerWindow);
+ panes.evaluateWindow(out);
+ }
+
+ // ------------------------------------------------------------------------
+ // Property access (for testing)
+ // ------------------------------------------------------------------------
+
+ public long getWindowSize() {
+ return windowSize;
+ }
+
+ public long getWindowSlide() {
+ return windowSlide;
+ }
+
+ public long getPaneSize() {
+ return paneSize;
+ }
+
+ public int getNumPanesPerWindow() {
+ return numPanesPerWindow;
+ }
+
+ public long getNextEvaluationTime() {
+ return nextEvaluationTime;
+ }
+
+ public long getNextSlideTime() {
+ return nextSlideTime;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
new file mode 100644
index 0000000..fae024b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayDeque;
+
+
+public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
+
+ protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
+
+ protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
+
+ // ------------------------------------------------------------------------
+
+ public abstract void addElementToLatestPane(Type element) throws Exception;
+
+ public abstract void evaluateWindow(Collector<Result> out) throws Exception;
+
+
+ public void dispose() {
+ // since all is heap data, there is no need to clean up anything
+ latestPane = null;
+ previousPanes.clear();
+ }
+
+
+ public void slidePanes(int panesToKeep) {
+ if (panesToKeep > 1) {
+ // the current pane becomes the latest previous pane
+ previousPanes.addLast(latestPane);
+
+ // truncate the history
+ while (previousPanes.size() >= panesToKeep) {
+ previousPanes.removeFirst();
+ }
+ }
+
+ // we need a new latest pane
+ latestPane = new KeyMap<>();
+ }
+
+ public void truncatePanes(int numToRetain) {
+ while (previousPanes.size() >= numToRetain) {
+ previousPanes.removeFirst();
+ }
+ }
+
+ protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
+ // gather all panes in an array (faster iterations)
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
+ panes[panes.length - 1] = latestPane;
+
+ // let the maps make a coordinated traversal and evaluate the window function per contained key
+ KeyMap.traverseMaps(panes, traversal, traversalPass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
new file mode 100644
index 0000000..d85c53e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.util.UnionIterator;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+
+
+public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
+
+ private final KeySelector<Type, Key> keySelector;
+
+ private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
+
+ private final KeyedWindowFunction<Type, Result, Key> function;
+
+ private long evaluationPass;
+
+ // ------------------------------------------------------------------------
+
+ public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) {
+ this.keySelector = keySelector;
+ this.function = function;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void addElementToLatestPane(Type element) throws Exception {
+ Key k = keySelector.getKey(element);
+ ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
+ elements.add(element);
+ }
+
+ @Override
+ public void evaluateWindow(Collector<Result> out) throws Exception {
+ if (previousPanes.isEmpty()) {
+ // optimized path for single pane case (tumbling window)
+ for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
+ function.evaluate(entry.getKey(), entry.getValue(), out);
+ }
+ }
+ else {
+ // general code path for multi-pane case
+ WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out);
+ traverseAllPanes(evaluator, evaluationPass);
+ }
+
+ evaluationPass++;
+ }
+
+ // ------------------------------------------------------------------------
+ // Running a window function in a map traversal
+ // ------------------------------------------------------------------------
+
+ static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
+
+ private final KeyedWindowFunction<Type, Result, Key> function;
+
+ private final UnionIterator<Type> unionIterator;
+
+ private final Collector<Result> out;
+
+ private Key currentKey;
+
+ WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) {
+ this.function = function;
+ this.out = out;
+ this.unionIterator = new UnionIterator<>();
+ }
+
+
+ @Override
+ public void startNewKey(Key key) {
+ unionIterator.clear();
+ currentKey = key;
+ }
+
+ @Override
+ public void nextValue(ArrayList<Type> value) {
+ unionIterator.addList(value);
+ }
+
+ @Override
+ public void keyDone() throws Exception {
+ function.evaluate(currentKey, unionIterator, out);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Lazy factory for lists (put if absent)
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
+ return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
+ }
+
+ private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
+
+ @Override
+ public ArrayList<?> create() {
+ return new ArrayList<>(4);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..4df308d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+
+
+public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> {
+
+ private static final long serialVersionUID = 7305948082830843475L;
+
+
+ public AccumulatingProcessingTimeWindowOperator(
+ KeyedWindowFunction<IN, OUT, KEY> function,
+ KeySelector<IN, KEY> keySelector,
+ long windowLength,
+ long windowSlide)
+ {
+ super(function, keySelector, windowLength, windowSlide);
+ }
+
+ @Override
+ protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function;
+
+ return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
new file mode 100644
index 0000000..48f4eb1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+
+public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
+
+ private final KeySelector<Type, Key> keySelector;
+
+ private final ReduceFunction<Type> reducer;
+
+ private long evaluationPass;
+
+ // ------------------------------------------------------------------------
+
+ public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
+ this.keySelector = keySelector;
+ this.reducer = reducer;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void addElementToLatestPane(Type element) throws Exception {
+ Key k = keySelector.getKey(element);
+ latestPane.putOrAggregate(k, element, reducer);
+ }
+
+ @Override
+ public void evaluateWindow(Collector<Type> out) throws Exception {
+ if (previousPanes.isEmpty()) {
+ // optimized path for single pane case
+ for (KeyMap.Entry<Key, Type> entry : latestPane) {
+ out.collect(entry.getValue());
+ }
+ }
+ else {
+ // general code path for multi-pane case
+ AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
+ traverseAllPanes(evaluator, evaluationPass);
+ }
+
+ evaluationPass++;
+ }
+
+ // ------------------------------------------------------------------------
+ // The maps traversal that performs the final aggregation
+ // ------------------------------------------------------------------------
+
+ static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
+
+ private final ReduceFunction<Type> function;
+
+ private final Collector<Type> out;
+
+ private Type currentValue;
+
+ AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
+ this.function = function;
+ this.out = out;
+ }
+
+ @Override
+ public void startNewKey(Key key) {
+ currentValue = null;
+ }
+
+ @Override
+ public void nextValue(Type value) throws Exception {
+ if (currentValue != null) {
+ currentValue = function.reduce(currentValue, value);
+ }
+ else {
+ currentValue = value;
+ }
+ }
+
+ @Override
+ public void keyDone() throws Exception {
+ out.collect(currentValue);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..99457bb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class AggregatingProcessingTimeWindowOperator<KEY, IN>
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> {
+
+ private static final long serialVersionUID = 7305948082830843475L;
+
+
+ public AggregatingProcessingTimeWindowOperator(
+ ReduceFunction<IN> function,
+ KeySelector<IN, KEY> keySelector,
+ long windowLength,
+ long windowSlide)
+ {
+ super(function, keySelector, windowLength, windowSlide);
+ }
+
+ @Override
+ protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function;
+
+ return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
new file mode 100644
index 0000000..3f44c4a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.runtime.util.MathUtils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A special Hash Map implementation that can be traversed efficiently in sync with other
+ * hash maps.
+ * <p>
+ * The differences between this hash map and Java's "java.util.HashMap" are:
+ * <ul>
+ * <li>A different hashing scheme. This implementation uses extensible hashing, meaning that
+ * each hash table growth takes one more lower hash code bit into account, and values that where
+ * formerly in the same bucket will afterwards be in the two adjacent buckets.</li>
+ * <li>This allows an efficient traversal of multiple hash maps together, even though the maps are
+ * of different sizes.</li>
+ * <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"</li>
+ * <li>The map supports no removal/shrinking.</li>
+ * </ul>
+ */
+public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
+
+ /** The minimum table capacity, 64 entries */
+ private static final int MIN_CAPACITY = 0x40;
+
+ /** The maximum possible table capacity, the largest positive power of
+ * two in the 32bit signed integer value range */
+ private static final int MAX_CAPACITY = 0x40000000;
+
+ /** The number of bits used for table addressing when table is at max capacity */
+ private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY);
+
+ // ------------------------------------------------------------------------
+
+ /** The hash index, as an array of entries */
+ private Entry<K, V>[] table;
+
+ /** The number of bits by which the hash code is shifted right, to find the bucket */
+ private int shift;
+
+ /** The number of elements in the hash table */
+ private int numElements;
+
+ /** The number of elements above which the hash table needs to grow */
+ private int rehashThreshold;
+
+ /** The base-2 logarithm of the table capacity */
+ private int log2size;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new hash table with the default initial capacity.
+ */
+ public KeyMap() {
+ this(0);
+ }
+
+ /**
+ * Creates a new table with a capacity tailored to the given expected number of elements.
+ *
+ * @param expectedNumberOfElements The number of elements to tailor the capacity to.
+ */
+ public KeyMap(int expectedNumberOfElements) {
+ if (expectedNumberOfElements < 0) {
+ throw new IllegalArgumentException("Invalid capacity: " + expectedNumberOfElements);
+ }
+
+ // round up to the next power or two
+ // guard against too small capacity and integer overflows
+ int capacity = Integer.highestOneBit(expectedNumberOfElements) << 1;
+ capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : MAX_CAPACITY;
+
+ // this also acts as a sanity check
+ log2size = MathUtils.log2strict(capacity);
+ shift = FULL_BIT_RANGE - log2size;
+ table = allocateTable(capacity);
+ rehashThreshold = getRehashThreshold(capacity);
+ }
+
+ // ------------------------------------------------------------------------
+ // Gets and Puts
+ // ------------------------------------------------------------------------
+
+ /**
+ * Inserts the given value, mapped under the given key. If the table already contains a value for
+ * the key, the value is replaced and returned. If no value is contained, yet, the function
+ * returns null.
+ *
+ * @param key The key to insert.
+ * @param value The value to insert.
+ * @return The previously mapped value for the key, or null, if no value was mapped for the key.
+ *
+ * @throws java.lang.NullPointerException Thrown, if the key is null.
+ */
+ public final V put(K key, V value) {
+ final int hash = hash(key);
+ final int slot = indexOf (hash);
+
+ // search the chain from the slot
+ for (Entry<K, V> e = table[slot]; e != null; e = e.next) {
+ Object k;
+ if (e.hashCode == hash && ((k = e.key) == key || key.equals(k))) {
+ // found match
+ V old = e.value;
+ e.value = value;
+ return old;
+ }
+ }
+
+ // no match, insert a new value
+ insertNewEntry(hash, key, value, slot);
+ return null;
+ }
+
+ /**
+ * Inserts a value for the given key, if no value is yet contained for that key. Otherwise,
+ * returns the value currently contained for the key.
+ * <p>
+ * The value that is inserted in case that the key is not contained, yet, is lazily created
+ * using the given factory.
+ *
+ * @param key The key to insert.
+ * @param factory The factory that produces the value, if no value is contained, yet, for the key.
+ * @return The value in the map after this operation (either the previously contained value, or the
+ * newly created value).
+ *
+ * @throws java.lang.NullPointerException Thrown, if the key is null.
+ */
+ public final V putIfAbsent(K key, LazyFactory<V> factory) {
+ final int hash = hash(key);
+ final int slot = indexOf(hash);
+
+ // search the chain from the slot
+ for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
+ if (entry.hashCode == hash && entry.key.equals(key)) {
+ // found match
+ return entry.value;
+ }
+ }
+
+ // no match, insert a new value
+ V value = factory.create();
+ insertNewEntry(hash, key, value, slot);
+
+ // return the created value
+ return value;
+ }
+
+ /**
+ * Inserts or aggregates a value into the hash map. If the hash map does not yet contain the key,
+ * this method inserts the value. If the table already contains the key (and a value) this
+ * method will use the given ReduceFunction function to combine the existing value and the
+ * given value to a new value, and store that value for the key.
+ *
+ * @param key The key to map the value.
+ * @param value The new value to insert, or aggregate with the existing value.
+ * @param aggregator The aggregator to use if a value is already contained.
+ *
+ * @return The value in the map after this operation: Either the given value, or the aggregated value.
+ *
+ * @throws java.lang.NullPointerException Thrown, if the key is null.
+ * @throws Exception The method forwards exceptions from the aggregation function.
+ */
+ public final V putOrAggregate(K key, V value, ReduceFunction<V> aggregator) throws Exception {
+ final int hash = hash(key);
+ final int slot = indexOf(hash);
+
+ // search the chain from the slot
+ for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
+ if (entry.hashCode == hash && entry.key.equals(key)) {
+ // found match
+ entry.value = aggregator.reduce(entry.value, value);
+ return entry.value;
+ }
+ }
+
+ // no match, insert a new value
+ insertNewEntry(hash, key, value, slot);
+ // return the original value
+ return value;
+ }
+
+ /**
+ * Looks up the value mapped under the given key. Returns null if no value is mapped under this key.
+ *
+ * @param key The key to look up.
+ * @return The value associated with the key, or null, if no value is found for the key.
+ *
+ * @throws java.lang.NullPointerException Thrown, if the key is null.
+ */
+ public V get(K key) {
+ final int hash = hash(key);
+ final int slot = indexOf(hash);
+
+ // search the chain from the slot
+ for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
+ if (entry.hashCode == hash && entry.key.equals(key)) {
+ return entry.value;
+ }
+ }
+
+ // not found
+ return null;
+ }
+
+ private void insertNewEntry(int hashCode, K key, V value, int position) {
+ Entry<K,V> e = table[position];
+ table[position] = new Entry<>(key, value, hashCode, e);
+ numElements++;
+
+ // rehash if necessary
+ if (numElements > rehashThreshold) {
+ growTable();
+ }
+ }
+
+ private int indexOf(int hashCode) {
+ return (hashCode >> shift) & (table.length - 1);
+ }
+
+ /**
+ * Creates an iterator over the entries of this map.
+ *
+ * @return An iterator over the entries of this map.
+ */
+ @Override
+ public Iterator<Entry<K, V>> iterator() {
+ return new Iterator<Entry<K, V>>() {
+
+ private final Entry<K, V>[] tab = KeyMap.this.table;
+
+ private Entry<K, V> nextEntry;
+
+ private int nextPos = 0;
+
+ @Override
+ public boolean hasNext() {
+ if (nextEntry != null) {
+ return true;
+ }
+ else {
+ while (nextPos < tab.length) {
+ Entry<K, V> e = tab[nextPos++];
+ if (e != null) {
+ nextEntry = e;
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ if (nextEntry != null || hasNext()) {
+ Entry<K, V> e = nextEntry;
+ nextEntry = nextEntry.next;
+ return e;
+ }
+ else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the number of elements currently in the map.
+ * @return The number of elements currently in the map.
+ */
+ public int size() {
+ return numElements;
+ }
+
+ /**
+ * Checks whether the map is empty.
+ * @return True, if the map is empty, false otherwise.
+ */
+ public boolean isEmpty() {
+ return numElements == 0;
+ }
+
+ /**
+ * Gets the current table capacity, i.e., the number of slots in the hash table, without
+ * and overflow chaining.
+ * @return The number of slots in the hash table.
+ */
+ public int getCurrentTableCapacity() {
+ return table.length;
+ }
+
+ /**
+ * Gets the base-2 logarithm of the hash table capacity, as returned by
+ * {@link #getCurrentTableCapacity()}.
+ *
+ * @return The base-2 logarithm of the hash table capacity.
+ */
+ public int getLog2TableCapacity() {
+ return log2size;
+ }
+
+ public int getRehashThreshold() {
+ return rehashThreshold;
+ }
+
+ public int getShift() {
+ return shift;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ private Entry<K, V>[] allocateTable(int numElements) {
+ return (Entry<K, V>[]) new Entry<?, ?>[numElements];
+ }
+
+ private void growTable() {
+ final int newSize = table.length << 1;
+
+ // only grow if there is still space to grow the table
+ if (newSize > 0) {
+ final Entry<K, V>[] oldTable = this.table;
+ final Entry<K, V>[] newTable = allocateTable(newSize);
+
+ final int newShift = shift - 1;
+ final int newMask = newSize - 1;
+
+ // go over all slots from the table. since we hash to adjacent positions in
+ // the new hash table, this is actually cache efficient
+ for (Entry<K, V> entry : oldTable) {
+ // traverse the chain for each slot
+ while (entry != null) {
+ final int newPos = (entry.hashCode >> newShift) & newMask;
+ Entry<K, V> nextEntry = entry.next;
+ entry.next = newTable[newPos];
+ newTable[newPos] = entry;
+ entry = nextEntry;
+ }
+ }
+
+ this.table = newTable;
+ this.shift = newShift;
+ this.rehashThreshold = getRehashThreshold(newSize);
+ this.log2size += 1;
+ }
+ }
+
+ private static int hash(Object key) {
+ int code = key.hashCode();
+
+ // we need a strong hash function that generates diverse upper bits
+ // this hash function is more expensive than the "scramble" used by "java.util.HashMap",
+ // but required for this sort of hash table
+ code = (code + 0x7ed55d16) + (code << 12);
+ code = (code ^ 0xc761c23c) ^ (code >>> 19);
+ code = (code + 0x165667b1) + (code << 5);
+ code = (code + 0xd3a2646c) ^ (code << 9);
+ code = (code + 0xfd7046c5) + (code << 3);
+ return (code ^ 0xb55a4f09) ^ (code >>> 16);
+ }
+
+ private static int getRehashThreshold(int capacity) {
+ // divide before multiply, to avoid overflow
+ return capacity / 4 * 3;
+ }
+
+ // ------------------------------------------------------------------------
+ // Testing Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * For testing only: Actively counts the number of entries, rather than using the
+ * counter variable. This method has linear complexity, rather than constant.
+ *
+ * @return The counted number of entries.
+ */
+ int traverseAndCountElements() {
+ int num = 0;
+
+ for (Entry<?, ?> entry : table) {
+ while (entry != null) {
+ num++;
+ entry = entry.next;
+ }
+ }
+
+ return num;
+ }
+
+ /**
+ * For testing only: Gets the length of the longest overflow chain.
+ * This method has linear complexity.
+ *
+ * @return The length of the longest overflow chain.
+ */
+ int getLongestChainLength() {
+ int maxLen = 0;
+
+ for (Entry<?, ?> entry : table) {
+ int thisLen = 0;
+ while (entry != null) {
+ thisLen++;
+ entry = entry.next;
+ }
+ maxLen = Math.max(maxLen, thisLen);
+ }
+
+ return maxLen;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * An entry in the hash table.
+ *
+ * @param <K> Type of the key.
+ * @param <V> Type of the value.
+ */
+ public static final class Entry<K, V> {
+
+ final K key;
+ final int hashCode;
+
+ V value;
+ Entry<K, V> next;
+ long touchedTag;
+
+ Entry(K key, V value, int hashCode, Entry<K, V> next) {
+ this.key = key;
+ this.value = value;
+ this.next = next;
+ this.hashCode = hashCode;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Performs a traversal about logical the multi-map that results from the union of the
+ * given maps. This method does not actually build a union of the map, but traverses the hash maps
+ * together.
+ *
+ * @param maps The array uf maps whose union should be traversed.
+ * @param visitor The visitor that is called for each key and all values.
+ * @param touchedTag A tag that is used to mark elements that have been touched in this specific
+ * traversal. Each successive traversal should supply a larger value for this
+ * tag than the previous one.
+ *
+ * @param <K> The type of the map's key.
+ * @param <V> The type of the map's value.
+ */
+ public static <K, V> void traverseMaps(
+ final KeyMap<K, V>[] maps,
+ final TraversalEvaluator<K, V> visitor,
+ final long touchedTag)
+ throws Exception
+ {
+ // we need to work on the maps in descending size
+ Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);
+
+ final int[] shifts = new int[maps.length];
+ final int[] lowBitsMask = new int[maps.length];
+ final int numSlots = maps[0].table.length;
+ final int numTables = maps.length;
+
+ // figure out how much each hash table collapses the entries
+ for (int i = 0; i < numTables; i++) {
+ shifts[i] = maps[0].log2size - maps[i].log2size;
+ lowBitsMask[i] = (1 << shifts[i]) - 1;
+ }
+
+ // go over all slots (based on the largest hash table)
+ for (int pos = 0; pos < numSlots; pos++) {
+
+ // for each slot, go over all tables, until the table does not have that slot any more
+ // for tables where multiple slots collapse into one, we visit that one when we process the
+ // latest of all slots that collapse to that one
+ int mask;
+ for (int rootTable = 0;
+ rootTable < numTables && ((mask = lowBitsMask[rootTable]) & pos) == mask;
+ rootTable++)
+ {
+ // use that table to gather keys and start collecting keys from the following tables
+ // go over all entries of that slot in the table
+ Entry<K, V> entry = maps[rootTable].table[pos >> shifts[rootTable]];
+ while (entry != null) {
+ // take only entries that have not been collected as part of other tables
+ if (entry.touchedTag < touchedTag) {
+ entry.touchedTag = touchedTag;
+
+ final K key = entry.key;
+ final int hashCode = entry.hashCode;
+ visitor.startNewKey(key);
+ visitor.nextValue(entry.value);
+
+ addEntriesFromChain(entry.next, visitor, key, touchedTag, hashCode);
+
+ // go over the other hash tables and collect their entries for the key
+ for (int followupTable = rootTable + 1; followupTable < numTables; followupTable++) {
+ Entry<K, V> followupEntry = maps[followupTable].table[pos >> shifts[followupTable]];
+ if (followupEntry != null) {
+ addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode);
+ }
+ }
+
+ visitor.keyDone();
+ }
+
+ entry = entry.next;
+ }
+ }
+ }
+ }
+
+ private static <K, V> void addEntriesFromChain(
+ Entry<K, V> entry,
+ TraversalEvaluator<K, V> visitor,
+ K key,
+ long touchedTag,
+ int hashCode) throws Exception
+ {
+ while (entry != null) {
+ if (entry.touchedTag < touchedTag && entry.hashCode == hashCode && entry.key.equals(key)) {
+ entry.touchedTag = touchedTag;
+ visitor.nextValue(entry.value);
+ }
+ entry = entry.next;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Comparator that defines a descending order on maps depending on their table capacity
+ * and number of elements.
+ */
+ static final class CapacityDescendingComparator implements Comparator<KeyMap<?, ?>> {
+
+ static final CapacityDescendingComparator INSTANCE = new CapacityDescendingComparator();
+
+ private CapacityDescendingComparator() {}
+
+
+ @Override
+ public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) {
+ // this sorts descending
+ int cmp = o2.getLog2TableCapacity() - o1.getLog2TableCapacity();
+ if (cmp != 0) {
+ return cmp;
+ }
+ else {
+ return o2.size() - o1.size();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A factory for lazy/on-demand instantiation of values.
+ *
+ * @param <V> The type created by the factory.
+ */
+ public static interface LazyFactory<V> {
+
+ /**
+ * The factory method; creates the value.
+ * @return The value.
+ */
+ V create();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A visitor for a traversal over the union of multiple hash maps. The visitor is
+ * called for each key in the union of the maps and all values associated with that key
+ * (one per map, but multiple across maps).
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+ public static interface TraversalEvaluator<K, V> {
+
+ /**
+ * Called whenever the traversal starts with a new key.
+ *
+ * @param key The key traversed.
+ * @throws Exception Method forwards all exceptions.
+ */
+ void startNewKey(K key) throws Exception;
+
+ /**
+ * Called for each value found for the current key.
+ *
+ * @param value The next value.
+ * @throws Exception Method forwards all exceptions.
+ */
+ void nextValue(V value) throws Exception;
+
+ /**
+ * Called when the traversal for the current key is complete.
+ *
+ * @throws Exception Method forwards all exceptions.
+ */
+ void keyDone() throws Exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
new file mode 100644
index 0000000..b34d0bc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+
+/**
+ * This class implements the conversion from window policies to concrete operator
+ * implementations.
+ */
+public class PolicyToOperator {
+
+ /**
+ * Entry point to create an operator for the given window policies and the window function.
+ */
+ public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies(
+ WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector)
+ {
+ if (window == null || function == null) {
+ throw new NullPointerException();
+ }
+
+ // -- case 1: both policies are processing time policies
+ if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) {
+ final long windowLength = ((ProcessingTime) window).toMilliseconds();
+ final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds();
+
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<IN> reducer = (ReduceFunction<IN>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
+ new AggregatingProcessingTimeWindowOperator<KEY, IN>(
+ reducer, keySelector, windowLength, windowSlide);
+ return op;
+ }
+ else if (function instanceof KeyedWindowFunction) {
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function;
+
+ return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+ wf, keySelector, windowLength, windowSlide);
+ }
+ }
+
+ // -- case 2: both policies are event time policies
+ if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
+ // add event time implementation
+ }
+
+ throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide));
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Don't instantiate */
+ private PolicyToOperator() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
new file mode 100644
index 0000000..55749a1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the operators that implement the various window operations
+ * on data streams.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
deleted file mode 100644
index 2e926bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.commons.math3.util.ArithmeticUtils;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<IN, OUT>, Triggerable {
-
- private static final long serialVersionUID = 3245500864882459867L;
-
- private static final long MIN_SLIDE_TIME = 50;
-
- // ----- fields for operator parametrization -----
-
- private final Function function;
- private final KeySelector<IN, KEY> keySelector;
-
- private final long windowSize;
- private final long windowSlide;
- private final long paneSize;
- private final int numPanesPerWindow;
-
- // ----- fields for operator functionality -----
-
- private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
-
- private transient TimestampedCollector<OUT> out;
-
- private transient long nextEvaluationTime;
- private transient long nextSlideTime;
-
- protected AbstractAlignedProcessingTimeWindowOperator(
- Function function,
- KeySelector<IN, KEY> keySelector,
- long windowLength,
- long windowSlide)
- {
- if (function == null || keySelector == null) {
- throw new NullPointerException();
- }
- if (windowLength < MIN_SLIDE_TIME) {
- throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
- }
- if (windowSlide < MIN_SLIDE_TIME) {
- throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs");
- }
- if (windowLength < windowSlide) {
- throw new IllegalArgumentException("The window size must be larger than the window slide");
- }
-
- final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide);
- if (paneSlide < MIN_SLIDE_TIME) {
- throw new IllegalArgumentException(String.format(
- "Cannot compute window of size %d msecs sliding by %d msecs. " +
- "The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
- }
-
- this.function = function;
- this.keySelector = keySelector;
- this.windowSize = windowLength;
- this.windowSlide = windowSlide;
- this.paneSize = paneSlide;
- this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide);
- }
-
-
- protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
- KeySelector<IN, KEY> keySelector, Function function);
-
- // ------------------------------------------------------------------------
- // startup and shutdown
- // ------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {
- out = new TimestampedCollector<>(output);
-
- // create the panes that gather the elements per slide
- panes = createPanes(keySelector, function);
-
- // decide when to first compute the window and when to slide it
- // the values should align with the start of time (that is, the UNIX epoch, not the big bang)
- final long now = System.currentTimeMillis();
- nextEvaluationTime = now + windowSlide - (now % windowSlide);
- nextSlideTime = now + paneSize - (now % paneSize);
-
- getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
- }
-
- @Override
- public void close() throws Exception {
- final long finalWindowTimestamp = nextEvaluationTime;
-
- // early stop the triggering thread, so it does not attempt to return any more data
- stopTriggers();
-
- // emit the remaining data
- computeWindow(finalWindowTimestamp);
- }
-
- @Override
- public void dispose() {
- // acquire the lock during shutdown, to prevent trigger calls at the same time
- // fail-safe stop of the triggering thread (in case of an error)
- stopTriggers();
-
- // release the panes
- panes.dispose();
- }
-
- private void stopTriggers() {
- // reset the action timestamps. this makes sure any pending triggers will not evaluate
- nextEvaluationTime = -1L;
- nextSlideTime = -1L;
- }
-
- // ------------------------------------------------------------------------
- // Receiving elements and triggers
- // ------------------------------------------------------------------------
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- panes.addElementToLatestPane(element.getValue());
- }
-
- @Override
- public void processWatermark(Watermark mark) {
- // this operator does not react to watermarks
- }
-
- @Override
- public void trigger(long timestamp) throws Exception {
- // first we check if we actually trigger the window function
- if (timestamp == nextEvaluationTime) {
- // compute and output the results
- computeWindow(timestamp);
-
- nextEvaluationTime += windowSlide;
- }
-
- // check if we slide the panes by one. this may happen in addition to the
- // window computation, or just by itself
- if (timestamp == nextSlideTime) {
- panes.slidePanes(numPanesPerWindow);
- nextSlideTime += paneSize;
- }
-
- long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
- getRuntimeContext().registerTimer(nextTriggerTime, this);
- }
-
- private void computeWindow(long timestamp) throws Exception {
- out.setTimestamp(timestamp);
- panes.truncatePanes(numPanesPerWindow);
- panes.evaluateWindow(out);
- }
-
- // ------------------------------------------------------------------------
- // Property access (for testing)
- // ------------------------------------------------------------------------
-
- public long getWindowSize() {
- return windowSize;
- }
-
- public long getWindowSlide() {
- return windowSlide;
- }
-
- public long getPaneSize() {
- return paneSize;
- }
-
- public int getNumPanesPerWindow() {
- return numPanesPerWindow;
- }
-
- public long getNextEvaluationTime() {
- return nextEvaluationTime;
- }
-
- public long getNextSlideTime() {
- return nextSlideTime;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
deleted file mode 100644
index a49b2e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayDeque;
-
-
-public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-
- protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
-
- protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
-
- // ------------------------------------------------------------------------
-
- public abstract void addElementToLatestPane(Type element) throws Exception;
-
- public abstract void evaluateWindow(Collector<Result> out) throws Exception;
-
-
- public void dispose() {
- // since all is heap data, there is no need to clean up anything
- latestPane = null;
- previousPanes.clear();
- }
-
-
- public void slidePanes(int panesToKeep) {
- if (panesToKeep > 1) {
- // the current pane becomes the latest previous pane
- previousPanes.addLast(latestPane);
-
- // truncate the history
- while (previousPanes.size() >= panesToKeep) {
- previousPanes.removeFirst();
- }
- }
-
- // we need a new latest pane
- latestPane = new KeyMap<>();
- }
-
- public void truncatePanes(int numToRetain) {
- while (previousPanes.size() >= numToRetain) {
- previousPanes.removeFirst();
- }
- }
-
- protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
- // gather all panes in an array (faster iterations)
- @SuppressWarnings({"unchecked", "rawtypes"})
- KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
- panes[panes.length - 1] = latestPane;
-
- // let the maps make a coordinated traversal and evaluate the window function per contained key
- KeyMap.traverseMaps(panes, traversal, traversalPass);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
deleted file mode 100644
index 1212123..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-
-public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-
- private final KeySelector<Type, Key> keySelector;
-
- private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
-
- private final KeyedWindowFunction<Type, Result, Key> function;
-
- private long evaluationPass;
-
- // ------------------------------------------------------------------------
-
- public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) {
- this.keySelector = keySelector;
- this.function = function;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void addElementToLatestPane(Type element) throws Exception {
- Key k = keySelector.getKey(element);
- ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
- elements.add(element);
- }
-
- @Override
- public void evaluateWindow(Collector<Result> out) throws Exception {
- if (previousPanes.isEmpty()) {
- // optimized path for single pane case (tumbling window)
- for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
- function.evaluate(entry.getKey(), entry.getValue(), out);
- }
- }
- else {
- // general code path for multi-pane case
- WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out);
- traverseAllPanes(evaluator, evaluationPass);
- }
-
- evaluationPass++;
- }
-
- // ------------------------------------------------------------------------
- // Running a window function in a map traversal
- // ------------------------------------------------------------------------
-
- static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
-
- private final KeyedWindowFunction<Type, Result, Key> function;
-
- private final UnionIterator<Type> unionIterator;
-
- private final Collector<Result> out;
-
- private Key currentKey;
-
- WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) {
- this.function = function;
- this.out = out;
- this.unionIterator = new UnionIterator<>();
- }
-
-
- @Override
- public void startNewKey(Key key) {
- unionIterator.clear();
- currentKey = key;
- }
-
- @Override
- public void nextValue(ArrayList<Type> value) {
- unionIterator.addList(value);
- }
-
- @Override
- public void keyDone() throws Exception {
- function.evaluate(currentKey, unionIterator, out);
- }
- }
-
- // ------------------------------------------------------------------------
- // Lazy factory for lists (put if absent)
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
- private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
- return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
- }
-
- private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
-
- @Override
- public ArrayList<?> create() {
- return new ArrayList<>(4);
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
deleted file mode 100644
index fb9d163..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-
-
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> {
-
- private static final long serialVersionUID = 7305948082830843475L;
-
-
- public AccumulatingProcessingTimeWindowOperator(
- KeyedWindowFunction<IN, OUT, KEY> function,
- KeySelector<IN, KEY> keySelector,
- long windowLength,
- long windowSlide)
- {
- super(function, keySelector, windowLength, windowSlide);
- }
-
- @Override
- protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
- @SuppressWarnings("unchecked")
- KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function;
-
- return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
deleted file mode 100644
index 730c984..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-
-public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
-
- private final KeySelector<Type, Key> keySelector;
-
- private final ReduceFunction<Type> reducer;
-
- private long evaluationPass;
-
- // ------------------------------------------------------------------------
-
- public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
- this.keySelector = keySelector;
- this.reducer = reducer;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void addElementToLatestPane(Type element) throws Exception {
- Key k = keySelector.getKey(element);
- latestPane.putOrAggregate(k, element, reducer);
- }
-
- @Override
- public void evaluateWindow(Collector<Type> out) throws Exception {
- if (previousPanes.isEmpty()) {
- // optimized path for single pane case
- for (KeyMap.Entry<Key, Type> entry : latestPane) {
- out.collect(entry.getValue());
- }
- }
- else {
- // general code path for multi-pane case
- AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
- traverseAllPanes(evaluator, evaluationPass);
- }
-
- evaluationPass++;
- }
-
- // ------------------------------------------------------------------------
- // The maps traversal that performs the final aggregation
- // ------------------------------------------------------------------------
-
- static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
-
- private final ReduceFunction<Type> function;
-
- private final Collector<Type> out;
-
- private Type currentValue;
-
- AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
- this.function = function;
- this.out = out;
- }
-
- @Override
- public void startNewKey(Key key) {
- currentValue = null;
- }
-
- @Override
- public void nextValue(Type value) throws Exception {
- if (currentValue != null) {
- currentValue = function.reduce(currentValue, value);
- }
- else {
- currentValue = value;
- }
- }
-
- @Override
- public void keyDone() throws Exception {
- out.collect(currentValue);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 8bed749..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class AggregatingProcessingTimeWindowOperator<KEY, IN>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> {
-
- private static final long serialVersionUID = 7305948082830843475L;
-
-
- public AggregatingProcessingTimeWindowOperator(
- ReduceFunction<IN> function,
- KeySelector<IN, KEY> keySelector,
- long windowLength,
- long windowSlide)
- {
- super(function, keySelector, windowLength, windowSlide);
- }
-
- @Override
- protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
- @SuppressWarnings("unchecked")
- ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function;
-
- return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction);
- }
-}