You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:07 UTC
[11/12] flink git commit: [FLINK-2677] Add a general-purpose
keyed-window operator
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
new file mode 100644
index 0000000..cddcc42
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -0,0 +1,115 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(EvictingWindowOperator.class);
+
+ private final Evictor<? super IN, ? super W> evictor;
+
+ public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+ KeySelector<IN, K> keySelector,
+ WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
+ KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+ Trigger<? super IN, ? super W> trigger,
+ Evictor<? super IN, ? super W> evictor) {
+ super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
+ this.evictor = evictor;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked, rawtypes")
+ protected void emitWindow(K key, W window, boolean purge) throws Exception {
+
+ timestampedCollector.setTimestamp(window.getEnd());
+
+ Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+
+ if (keyWindows == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+
+ Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+ if (purge) {
+ bufferAndTrigger = keyWindows.remove(window);
+ } else {
+ bufferAndTrigger = keyWindows.get(window);
+ }
+
+ if (bufferAndTrigger == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+
+
+ EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+
+ int toEvict = 0;
+ if (windowBuffer.size() > 0) {
+ // need some type trickery here...
+ toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window);
+ }
+
+ windowBuffer.removeElements(toEvict);
+
+ userFunction.evaluate(key,
+ window,
+ bufferAndTrigger.f0.getUnpackedElements(),
+ timestampedCollector);
+
+ if (keyWindows.isEmpty()) {
+ windows.remove(key);
+ }
+ }
+
+ @Override
+ public EvictingWindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
+ super.enableSetProcessingTime(setProcessingTime);
+ return this;
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Getters for testing
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ public Evictor<? super IN, ? super W> getEvictor() {
+ return evictor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
index b1ff7e2..880c85c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
@@ -18,15 +18,41 @@
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
/**
* This class implements the conversion from window policies to concrete operator
@@ -55,7 +81,7 @@ public class PolicyToOperator {
@SuppressWarnings("unchecked")
OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
- new AggregatingProcessingTimeWindowOperator<KEY, IN>(
+ new AggregatingProcessingTimeWindowOperator<>(
reducer, keySelector, windowLength, windowSlide);
return op;
}
@@ -63,17 +89,147 @@ public class PolicyToOperator {
@SuppressWarnings("unchecked")
KeyedWindowFunction<IN, OUT, KEY, Window> wf = (KeyedWindowFunction<IN, OUT, KEY, Window>) function;
- return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+ return new AccumulatingProcessingTimeWindowOperator<>(
wf, keySelector, windowLength, windowSlide);
}
}
// -- case 2: both policies are event time policies
if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
- // add event time implementation
+ final long windowLength = ((EventTime) window).toMilliseconds();
+ final long windowSlide = slide == null ? windowLength : ((EventTime) slide).toMilliseconds();
+
+ WindowAssigner<? super IN, TimeWindow> assigner;
+ if (windowSlide == windowLength) {
+ assigner = TumblingTimeWindows.of(windowLength);
+ } else {
+ assigner = SlidingTimeWindows.of(windowLength, windowSlide);
+ }
+ WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
+ function = new ReduceWindowFunction<>(reducer);
+ windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
+ } else {
+ windowBuffer = new HeapWindowBuffer.Factory<>();
+ }
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<IN, OUT, KEY, TimeWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, TimeWindow>) function;
+
+ return new WindowOperator<>(
+ assigner,
+ keySelector,
+ windowBuffer,
+ windowFunction,
+ WatermarkTrigger.create());
}
-
- throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide));
+
+ // -- case 3: arbitrary trigger, no eviction
+ if (slide == null) {
+ Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(window);
+ // we need to make them purging triggers because the trigger/eviction policy model
+ // expects that the window is purged when no slide is used
+ Trigger<? super IN, GlobalWindow> purgingTrigger = PurgingTrigger.of(trigger);
+
+ WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
+ function = new ReduceWindowFunction<>(reducer);
+ windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
+ } else {
+ windowBuffer = new HeapWindowBuffer.Factory<>();
+ }
+
+ if (!(function instanceof KeyedWindowFunction)) {
+ throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
+ }
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
+
+ return new WindowOperator<>(
+ GlobalWindows.<IN>create(),
+ keySelector,
+ windowBuffer,
+ windowFunction,
+ purgingTrigger);
+ }
+
+ // -- case 4: arbitrary trigger, arbitrary eviction
+ Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(slide);
+ Evictor<? super IN, GlobalWindow> evictor = policyToEvictor(window);
+
+ WindowBufferFactory<IN, ? extends EvictingWindowBuffer<IN>> windowBuffer = new HeapWindowBuffer.Factory<>();
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
+ function = new ReduceWindowFunction<>(reducer);
+ }
+
+ if (!(function instanceof KeyedWindowFunction)) {
+ throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
+ }
+
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
+
+ EvictingWindowOperator<KEY, IN, OUT, GlobalWindow> op = new EvictingWindowOperator<>(
+ GlobalWindows.<IN>create(),
+ keySelector,
+ windowBuffer,
+ windowFunction,
+ trigger,
+ evictor);
+
+ if (window instanceof ProcessingTime) {
+ // special case, we need to instruct the window operator to store the processing time in
+ // the elements so that the evictor can work on that
+ op.enableSetProcessingTime(true);
+ }
+
+ return op;
+ }
+
+ private static <IN> Trigger<? super IN, GlobalWindow> policyToTrigger(WindowPolicy policy) {
+ if (policy instanceof EventTime) {
+ EventTime eventTime = (EventTime) policy;
+ return ContinuousWatermarkTrigger.of(eventTime.getSize());
+ } else if (policy instanceof ProcessingTime) {
+ ProcessingTime processingTime = (ProcessingTime) policy;
+ return ContinuousProcessingTimeTrigger.of(processingTime.getSize());
+ } else if (policy instanceof Count) {
+ Count count = (Count) policy;
+ return CountTrigger.of(count.getSize());
+ } else if (policy instanceof Delta) {
+ @SuppressWarnings("unchecked,rawtypes")
+ Delta<IN> delta = (Delta) policy;
+ return DeltaTrigger.of(delta.getThreshold(), delta.getDeltaFunction());
+
+ }
+
+ throw new UnsupportedOperationException("Unsupported policy " + policy);
+ }
+
+ private static <IN> Evictor<? super IN, GlobalWindow> policyToEvictor(WindowPolicy policy) {
+ if (policy instanceof EventTime) {
+ EventTime eventTime = (EventTime) policy;
+ return TimeEvictor.of(eventTime.getSize());
+ } else if (policy instanceof ProcessingTime) {
+ ProcessingTime processingTime = (ProcessingTime) policy;
+ return TimeEvictor.of(processingTime.getSize());
+ } else if (policy instanceof Count) {
+ Count count = (Count) policy;
+ return CountEvictor.of(count.getSize());
+ } else if (policy instanceof Delta) {
+ @SuppressWarnings("unchecked,rawtypes")
+ Delta<IN> delta = (Delta) policy;
+ return DeltaEvictor.of(delta.getThreshold(), delta.getDeltaFunction());
+
+ }
+
+
+ throw new UnsupportedOperationException("Unsupported policy " + policy);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
new file mode 100644
index 0000000..cda4481
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -0,0 +1,320 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public class WindowOperator<K, IN, OUT, W extends Window>
+ extends AbstractUdfStreamOperator<OUT, KeyedWindowFunction<IN, OUT, K, W>>
+ implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
+
+
+ private final WindowAssigner<? super IN, W> windowAssigner;
+ private final KeySelector<IN, K> keySelector;
+
+ private final Trigger<? super IN, ? super W> triggerTemplate;
+ private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
+
+ protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows;
+
+ private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+ private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+
+ protected transient TimestampedCollector<OUT> timestampedCollector;
+
+ private boolean setProcessingTime = false;
+
+ private TypeSerializer<IN> inputSerializer;
+
+ public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+ KeySelector<IN, K> keySelector,
+ WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
+ KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+ Trigger<? super IN, ? super W> trigger) {
+
+ super(windowFunction);
+
+ this.windowAssigner = windowAssigner;
+ this.keySelector = keySelector;
+
+ this.windowBufferFactory = windowBufferFactory;
+ this.triggerTemplate = trigger;
+
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+// forceInputCopy();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+ inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ windows = Maps.newHashMap();
+ watermarkTimers = Maps.newHashMap();
+ processingTimeTimers = Maps.newHashMap();
+ timestampedCollector = new TimestampedCollector<>(output);
+
+ if (inputSerializer == null) {
+ throw new IllegalStateException("Input serializer was not set.");
+ }
+
+ windowBufferFactory.setRuntimeContext(getRuntimeContext());
+ windowBufferFactory.open(parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ // emit the elements that we still keep
+ for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> entry: windows.entrySet()) {
+ K key = entry.getKey();
+ Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = entry.getValue();
+ for (W window: keyWindows.keySet()) {
+ emitWindow(key, window, false);
+ }
+ }
+ windows.clear();
+ windowBufferFactory.close();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ if (setProcessingTime) {
+ element.replace(element.getValue(), System.currentTimeMillis());
+ }
+ Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+
+ K key = keySelector.getKey(element.getValue());
+
+ Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+ if (keyWindows == null) {
+ keyWindows = Maps.newHashMap();
+ windows.put(key, keyWindows);
+ }
+
+ for (W window: elementWindows) {
+ Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = keyWindows.get(window);
+ if (bufferAndTrigger == null) {
+ bufferAndTrigger = new Tuple2<>();
+ bufferAndTrigger.f0 = windowBufferFactory.create();
+ bufferAndTrigger.f1 = new TriggerContext(key, window, triggerTemplate.duplicate());
+ keyWindows.put(window, bufferAndTrigger);
+ }
+ StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
+ bufferAndTrigger.f0.storeElement(elementCopy);
+ Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+ processTriggerResult(triggerResult, key, window);
+ }
+ }
+
+ protected void emitWindow(K key, W window, boolean purge) throws Exception {
+ timestampedCollector.setTimestamp(window.getEnd());
+
+ Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+
+ if (keyWindows == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+
+ Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+ if (purge) {
+ bufferAndTrigger = keyWindows.remove(window);
+ } else {
+ bufferAndTrigger = keyWindows.get(window);
+ }
+
+ if (bufferAndTrigger == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+
+
+ userFunction.evaluate(key,
+ window,
+ bufferAndTrigger.f0.getUnpackedElements(),
+ timestampedCollector);
+
+ if (keyWindows.isEmpty()) {
+ windows.remove(key);
+ }
+ }
+
+ private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
+ switch (triggerResult) {
+ case FIRE:
+ emitWindow(key, window, false);
+ break;
+
+ case FIRE_AND_PURGE:
+ emitWindow(key, window, true);
+ break;
+
+ case CONTINUE:
+ // ingore
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ Set<Long> toRemove = Sets.newHashSet();
+
+ for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+ if (triggers.getKey() <= mark.getTimestamp()) {
+ for (TriggerContext trigger: triggers.getValue()) {
+ Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
+ processTriggerResult(triggerResult, trigger.key, trigger.window);
+ }
+ toRemove.add(triggers.getKey());
+ }
+ }
+
+ for (Long l: toRemove) {
+ watermarkTimers.remove(l);
+ }
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ public void trigger(long time) throws Exception {
+ Set<Long> toRemove = Sets.newHashSet();
+
+ for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+ if (triggers.getKey() < time) {
+ for (TriggerContext trigger: triggers.getValue()) {
+ Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
+ processTriggerResult(triggerResult, trigger.key, trigger.window);
+ }
+ toRemove.add(triggers.getKey());
+ }
+ }
+
+ for (Long l: toRemove) {
+ processingTimeTimers.remove(l);
+ }
+ }
+
+ protected class TriggerContext implements Trigger.TriggerContext {
+ Trigger<? super IN, ? super W> trigger;
+ K key;
+ W window;
+
+ public TriggerContext(K key, W window, Trigger<? super IN, ? super W> trigger) {
+ this.key = key;
+ this.window = window;
+ this.trigger = trigger;
+ }
+
+ @Override
+ public void registerProcessingTimeTimer(long time) {
+ Set<TriggerContext> triggers = processingTimeTimers.get(time);
+ if (triggers == null) {
+ getRuntimeContext().registerTimer(time, WindowOperator.this);
+ triggers = Sets.newHashSet();
+ processingTimeTimers.put(time, triggers);
+ }
+ triggers.add(this);
+ }
+
+ @Override
+ public void registerWatermarkTimer(long time) {
+ Set<TriggerContext> triggers = watermarkTimers.get(time);
+ if (triggers == null) {
+ triggers = Sets.newHashSet();
+ watermarkTimers.put(time, triggers);
+ }
+ triggers.add(this);
+ }
+ }
+
+ /**
+ * When this flag is enabled the current processing time is set as the timestamp of elements
+ * upon arrival. This must be used, for example, when using the
+ * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
+ * time semantics.
+ */
+ public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
+ this.setProcessingTime = setProcessingTime;
+ return this;
+ }
+
+ // ------------------------------------------------------------------------
+ // Getters for testing
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ public Trigger<? super IN, ? super W> getTriggerTemplate() {
+ return triggerTemplate;
+ }
+
+ @VisibleForTesting
+ public KeySelector<IN, K> getKeySelector() {
+ return keySelector;
+ }
+
+ @VisibleForTesting
+ public WindowAssigner<? super IN, W> getWindowAssigner() {
+ return windowAssigner;
+ }
+
+ @VisibleForTesting
+ public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
+ return windowBufferFactory;
+ }
+
+ @VisibleForTesting
+ public boolean isSetProcessingTime() {
+ return setProcessingTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
new file mode 100644
index 0000000..50e392b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
+ public boolean removeElements(int count);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
new file mode 100644
index 0000000..092718a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayDeque;
+
+public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
+ private static final long serialVersionUID = 1L;
+
+ private ArrayDeque<StreamRecord<T>> elements;
+
+ protected HeapWindowBuffer() {
+ this.elements = new ArrayDeque<>();
+ }
+
+ @Override
+ public void storeElement(StreamRecord<T> element) {
+ elements.add(element);
+ }
+
+ @Override
+ public boolean removeElements(int count) {
+ // TODO determine if this can be done in a better way
+ for (int i = 0; i < count; i++) {
+ elements.removeFirst();
+ }
+ return false;
+ }
+
+ @Override
+ public Iterable<StreamRecord<T>> getElements() {
+ return elements;
+ }
+
+ @Override
+ public Iterable<T> getUnpackedElements() {
+ return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
+ @Override
+ public T apply(StreamRecord<T> record) {
+ return record.getValue();
+ }
+ });
+ }
+
+ @Override
+ public int size() {
+ return elements.size();
+ }
+
+ public static class Factory<T> implements WindowBufferFactory<T, HeapWindowBuffer<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setRuntimeContext(RuntimeContext ctx) {}
+
+ @Override
+ public void open(Configuration config) {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public HeapWindowBuffer<T> create() {
+ return new HeapWindowBuffer<>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
new file mode 100644
index 0000000..85f90b0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+
+public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+ private transient StreamRecord<T> data;
+
+ protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) {
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void storeElement(StreamRecord<T> element) throws Exception {
+ if (data == null) {
+ data = new StreamRecord<>(element.getValue(), element.getTimestamp());
+ } else {
+ data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
+ }
+ }
+
+ @Override
+ public Iterable<StreamRecord<T>> getElements() {
+ return Collections.singleton(data);
+ }
+
+ @Override
+ public Iterable<T> getUnpackedElements() {
+ return Collections.singleton(data.getValue());
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+
+ public static class Factory<T> implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> {
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+
+ public Factory(ReduceFunction<T> reduceFunction) {
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext ctx) {
+ FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ FunctionUtils.openFunction(reduceFunction, config);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(reduceFunction);
+ }
+
+ @Override
+ public PreAggregatingHeapWindowBuffer<T> create() {
+ return new PreAggregatingHeapWindowBuffer<T>(reduceFunction);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
new file mode 100644
index 0000000..8c891d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+
+public interface WindowBuffer<T> extends Serializable {
+
+ public void storeElement(StreamRecord<T> element) throws Exception;
+
+ public Iterable<StreamRecord<T>> getElements();
+
+ public Iterable<T> getUnpackedElements();
+
+ public int size();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
new file mode 100644
index 0000000..4a7f6df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+
+public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable {
+ void setRuntimeContext(RuntimeContext ctx);
+ void open(Configuration config) throws Exception;
+ void close() throws Exception;
+ B create();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
new file mode 100644
index 0000000..3d9605e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EvictingWindowOperatorTest {
+
+ // For counting if close() is called the correct number of times on the SumReducer
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCountTrigger() throws Exception {
+ AtomicInteger closeCalled = new AtomicInteger(0);
+
+ final int WINDOW_SIZE = 4;
+ final int WINDOW_SLIDE = 2;
+
+ EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+ GlobalWindows.create(),
+ new TupleKeySelector(),
+ new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
+ new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+ CountTrigger.of(WINDOW_SLIDE),
+ CountEvictor.of(WINDOW_SIZE));
+
+ operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // The global window actually ignores these timestamps...
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+
+ Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+
+ }
+
+ // ------------------------------------------------------------------------
+ // UDFs
+ // ------------------------------------------------------------------------
+
+ public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private boolean openCalled = false;
+
+ private AtomicInteger closeCalled;
+
+ public SumReducer(AtomicInteger closeCalled) {
+ this.closeCalled = closeCalled;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ openCalled = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ closeCalled.incrementAndGet();
+ }
+
+ @Override
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+ Tuple2<String, Integer> value2) throws Exception {
+ if (!openCalled) {
+ Assert.fail("Open was not called");
+ }
+ return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static class ResultSortComparator implements Comparator<Object> {
+ @Override
+ public int compare(Object o1, Object o2) {
+ if (o1 instanceof Watermark || o2 instanceof Watermark) {
+ return 0;
+ } else {
+ StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
+ StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
+ if (sr0.getTimestamp() != sr1.getTimestamp()) {
+ return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+ }
+ int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+ if (comparison != 0) {
+ return comparison;
+ } else {
+ return sr0.getValue().f1 - sr1.getValue().f1;
+ }
+ }
+ }
+ }
+
+ private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, Integer> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
new file mode 100644
index 0000000..6f42514
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
+ * the correct window operator.
+ */
+public class PolicyWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+ /**
+ * These tests ensure that the fast aligned time windows operator is used if the
+ * conditions are right.
+ */
+ @Test
+ public void testFastTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .keyBy(0)
+ .window(Time.of(1000, TimeUnit.MILLISECONDS))
+ .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void evaluate(Tuple tuple,
+ Window window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testNonEvicting() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(Count.of(200))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof WindowOperator);
+ WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof PurgingTrigger);
+ Assert.assertTrue(((PurgingTrigger)winOperator1.getTriggerTemplate()).getNestedTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof GlobalWindows);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .keyBy(0)
+ .window(Delta.of(15.0, new DeltaFunction<Object>() {
+ @Override
+ public double getDelta(Object oldDataPoint, Object newDataPoint) {
+ return 0;
+ }
+ }))
+ .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void evaluate(Tuple tuple,
+ Window window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof WindowOperator);
+ WindowOperator winOperator2 = (WindowOperator) operator2;
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof PurgingTrigger);
+ Assert.assertTrue(((PurgingTrigger)winOperator2.getTriggerTemplate()).getNestedTrigger() instanceof DeltaTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof GlobalWindows);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testEvicting() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(Time.of(1000, TimeUnit.MICROSECONDS), Count.of(100))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
+ EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
+ // ensure that the operator sets the current processing time as timestamp
+ Assert.assertTrue(winOperator1.isSetProcessingTime());
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof GlobalWindows);
+ Assert.assertTrue(winOperator1.getEvictor() instanceof TimeEvictor);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .keyBy(0)
+ .window(Count.of(1000), Delta.of(1.0, new DeltaFunction<Object>() {
+ @Override
+ public double getDelta(Object oldDataPoint, Object newDataPoint) {
+ return 0;
+ }
+ }))
+ .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void evaluate(Tuple tuple,
+ Window window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
+ EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof DeltaTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof GlobalWindows);
+ Assert.assertTrue(winOperator2.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+ }
+
+ // ------------------------------------------------------------------------
+ // UDFs
+ // ------------------------------------------------------------------------
+
+ public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+ return value1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
new file mode 100644
index 0000000..5078c8c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.KeyedTriggerWindowDataStream} instantiate
+ * the correct window operator.
+ */
+public class TriggerWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+ /**
+ * These tests ensure that the fast aligned time windows operator is used if the
+ * conditions are right.
+ */
+ @Test
+ public void testFastTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .keyBy(0)
+ .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void evaluate(Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testNonEvicting() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .trigger(CountTrigger.of(100))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof WindowOperator);
+ WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .keyBy(0)
+ .window(TumblingProcessingTimeWindows.of(1000))
+ .trigger(CountTrigger.of(100))
+ .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void evaluate(Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof WindowOperator);
+ WindowOperator winOperator2 = (WindowOperator) operator2;
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testEvicting() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .evictor(CountEvictor.of(100))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
+ EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .keyBy(0)
+ .window(TumblingProcessingTimeWindows.of(1000))
+ .trigger(CountTrigger.of(100))
+ .evictor(TimeEvictor.of(100))
+ .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void evaluate(Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
+ EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+ }
+
+ // ------------------------------------------------------------------------
+ // UDFs
+ // ------------------------------------------------------------------------
+
+ public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+ return value1;
+ }
+ }
+}