You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/20 18:42:22 UTC
[3/4] flink git commit: [FLINK-2864] Make State of General-Purpose
Window Operators Fault-Tolerant
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index aecfd5d..7ab33cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,12 +18,15 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -37,11 +40,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -65,26 +73,70 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(NonKeyedWindowOperator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
+ // ------------------------------------------------------------------------
+ // Configuration values and stuff from the user
+ // ------------------------------------------------------------------------
private final WindowAssigner<? super IN, W> windowAssigner;
- private final Trigger<? super IN, ? super W> triggerTemplate;
+ private final Trigger<? super IN, ? super W> trigger;
+
private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
- protected transient Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> windows;
+ /**
+ * If this is true. The current processing time is set as the timestamp of incoming elements.
+ * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+ * if eviction should happen based on processing time.
+ */
+ private boolean setProcessingTime = false;
- private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
- private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+ /**
+ * This is used to copy the incoming element because it can be put into several window
+ * buffers.
+ */
+ private TypeSerializer<IN> inputSerializer;
+ /**
+ * For serializing the window in checkpoints.
+ */
+ private final TypeSerializer<W> windowSerializer;
+
+ // ------------------------------------------------------------------------
+ // State that is not checkpointed
+ // ------------------------------------------------------------------------
+
+ /**
+ * Processing time timers that are currently in-flight.
+ */
+ private transient Map<Long, Set<Context>> processingTimeTimers;
+
+ /**
+ * Current waiting watermark callbacks.
+ */
+ private transient Map<Long, Set<Context>> watermarkTimers;
+
+ /**
+ * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
+ */
protected transient TimestampedCollector<OUT> timestampedCollector;
- private boolean setProcessingTime = false;
+ // ------------------------------------------------------------------------
+ // State that needs to be checkpointed
+ // ------------------------------------------------------------------------
- private TypeSerializer<IN> inputSerializer;
+ /**
+ * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+ * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+ */
+ protected transient Map<W, Context> windows;
+ /**
+ * Creates a new {@code WindowOperator} based on the given policies and user functions.
+ */
public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+ TypeSerializer<W> windowSerializer,
WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
AllWindowFunction<IN, OUT, W> windowFunction,
Trigger<? super IN, ? super W> trigger) {
@@ -92,25 +144,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
super(windowFunction);
this.windowAssigner = requireNonNull(windowAssigner);
+ this.windowSerializer = windowSerializer;
this.windowBufferFactory = requireNonNull(windowBufferFactory);
- this.triggerTemplate = requireNonNull(trigger);
+ this.trigger = requireNonNull(trigger);
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
@SuppressWarnings("unchecked")
- public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+ public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
}
@Override
- public void open() throws Exception {
+ public final void open() throws Exception {
super.open();
- windows = new HashMap<>();
- watermarkTimers = new HashMap<>();
- processingTimeTimers = new HashMap<>();
timestampedCollector = new TimestampedCollector<>(output);
if (inputSerializer == null) {
@@ -119,14 +169,47 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
windowBufferFactory.setRuntimeContext(getRuntimeContext());
windowBufferFactory.open(getUserFunctionParameters());
+
+ // these could already be initialized from restoreState()
+ if (watermarkTimers == null) {
+ watermarkTimers = new HashMap<>();
+ }
+ if (processingTimeTimers == null) {
+ processingTimeTimers = new HashMap<>();
+ }
+ if (windows == null) {
+ windows = new HashMap<>();
+ }
+
+ // re-register timers that this window context had set
+ for (Context context: windows.values()) {
+ if (context.processingTimeTimer > 0) {
+ Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
+ if (triggers == null) {
+ getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this);
+ triggers = new HashSet<>();
+ processingTimeTimers.put(context.processingTimeTimer, triggers);
+ }
+ triggers.add(context);
+ }
+ if (context.watermarkTimer > 0) {
+ Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
+ if (triggers == null) {
+ triggers = new HashSet<>();
+ watermarkTimers.put(context.watermarkTimer, triggers);
+ }
+ triggers.add(context);
+ }
+
+ }
}
@Override
- public void close() throws Exception {
+ public final void close() throws Exception {
super.close();
// emit the elements that we still keep
- for (W window: windows.keySet()) {
- emitWindow(window, false);
+ for (Context window: windows.values()) {
+ emitWindow(window);
}
windows.clear();
windowBufferFactory.close();
@@ -134,58 +217,60 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
@Override
@SuppressWarnings("unchecked")
- public void processElement(StreamRecord<IN> element) throws Exception {
+ public final void processElement(StreamRecord<IN> element) throws Exception {
if (setProcessingTime) {
element.replace(element.getValue(), System.currentTimeMillis());
}
+
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
for (W window: elementWindows) {
- Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = windows.get(window);
- if (bufferAndTrigger == null) {
- bufferAndTrigger = new Tuple2<>();
- bufferAndTrigger.f0 = windowBufferFactory.create();
- bufferAndTrigger.f1 = new TriggerContext(window, triggerTemplate.duplicate());
- windows.put(window, bufferAndTrigger);
+ Context context = windows.get(window);
+ if (context == null) {
+ WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
+ context = new Context(window, windowBuffer);
+ windows.put(window, context);
}
StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
- bufferAndTrigger.f0.storeElement(elementCopy);
- Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+ context.windowBuffer.storeElement(elementCopy);
+ Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
processTriggerResult(triggerResult, window);
}
}
- protected void emitWindow(W window, boolean purge) throws Exception {
- timestampedCollector.setTimestamp(window.getEnd());
-
- Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
- if (purge) {
- bufferAndTrigger = windows.remove(window);
- } else {
- bufferAndTrigger = windows.get(window);
- }
-
- if (bufferAndTrigger == null) {
- LOG.debug("Window {} already gone.", window);
- return;
- }
-
+ protected void emitWindow(Context context) throws Exception {
+ timestampedCollector.setTimestamp(context.window.maxTimestamp());
userFunction.apply(
- window,
- bufferAndTrigger.f0.getUnpackedElements(),
+ context.window,
+ context.windowBuffer.getUnpackedElements(),
timestampedCollector);
}
private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
switch (triggerResult) {
- case FIRE:
- emitWindow(window, false);
+ case FIRE: {
+ Context context = windows.get(window);
+ if (context == null) {
+ LOG.debug("Window {} already gone.", window);
+ return;
+ }
+
+
+ emitWindow(context);
break;
+ }
- case FIRE_AND_PURGE:
- emitWindow(window, true);
+ case FIRE_AND_PURGE: {
+ Context context = windows.remove(window);
+ if (context == null) {
+ LOG.debug("Window {} already gone.", window);
+ return;
+ }
+
+ emitWindow(context);
break;
+ }
case CONTINUE:
// ingore
@@ -193,14 +278,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
+ public final void processWatermark(Watermark mark) throws Exception {
Set<Long> toRemove = new HashSet<>();
- for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+ for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
if (triggers.getKey() <= mark.getTimestamp()) {
- for (TriggerContext trigger: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
- processTriggerResult(triggerResult, trigger.window);
+ for (Context context: triggers.getValue()) {
+ Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
+ processTriggerResult(triggerResult, context.window);
}
toRemove.add(triggers.getKey());
}
@@ -213,14 +298,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
@Override
- public void trigger(long time) throws Exception {
+ public final void trigger(long time) throws Exception {
Set<Long> toRemove = new HashSet<>();
- for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+ for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
if (triggers.getKey() < time) {
- for (TriggerContext trigger: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
- processTriggerResult(triggerResult, trigger.window);
+ for (Context context: triggers.getValue()) {
+ Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+ processTriggerResult(triggerResult, context.window);
}
toRemove.add(triggers.getKey());
}
@@ -231,35 +316,139 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
}
- protected class TriggerContext implements Trigger.TriggerContext {
- Trigger<? super IN, ? super W> trigger;
- W window;
+ /**
+ * A context object that is given to {@code Trigger} functions to allow them to register
+ * timer/watermark callbacks.
+ */
+ protected class Context implements Trigger.TriggerContext {
+ protected W window;
+
+ protected WindowBuffer<IN> windowBuffer;
+
+ protected HashMap<String, Serializable> state;
+
+ // use these to only allow one timer in flight at a time of each type
+ // if the trigger registers another timer this value here will be overwritten,
+ // the timer is not removed from the set of in-flight timers to improve performance.
+ // When a trigger fires it is just checked against the last timer that was set.
+ protected long watermarkTimer;
+ protected long processingTimeTimer;
- public TriggerContext(W window, Trigger<? super IN, ? super W> trigger) {
+ public Context(
+ W window,
+ WindowBuffer<IN> windowBuffer) {
this.window = window;
- this.trigger = trigger;
+ this.windowBuffer = windowBuffer;
+ state = new HashMap<>();
+
+ this.watermarkTimer = -1;
+ this.processingTimeTimer = -1;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ protected Context(DataInputView in) throws Exception {
+ this.window = windowSerializer.deserialize(in);
+ this.watermarkTimer = in.readLong();
+ this.processingTimeTimer = in.readLong();
+
+ int stateSize = in.readInt();
+ byte[] stateData = new byte[stateSize];
+ in.read(stateData);
+ ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
+ state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+
+ this.windowBuffer = windowBufferFactory.create();
+ int numElements = in.readInt();
+ MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+ for (int i = 0; i < numElements; i++) {
+ windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+ }
+ }
+
+ protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+ windowSerializer.serialize(window, out);
+ out.writeLong(watermarkTimer);
+ out.writeLong(processingTimeTimer);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ SerializationUtils.serialize(state, baos);
+ out.writeInt(baos.size());
+ out.write(baos.toByteArray(), 0, baos.size());
+
+ MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+ out.writeInt(windowBuffer.size());
+ for (StreamRecord<IN> element: windowBuffer.getElements()) {
+ recordSerializer.serialize(element, out);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
+ return new OperatorState<S>() {
+ @Override
+ public S value() throws IOException {
+ Serializable value = state.get(name);
+ if (value == null) {
+ state.put(name, defaultState);
+ value = defaultState;
+ }
+ return (S) value;
+ }
+
+ @Override
+ public void update(S value) throws IOException {
+ state.put(name, value);
+ }
+ };
}
@Override
public void registerProcessingTimeTimer(long time) {
- Set<TriggerContext> triggers = processingTimeTimers.get(time);
+ if (this.processingTimeTimer == time) {
+ // we already have set a trigger for that time
+ return;
+ }
+ Set<Context> triggers = processingTimeTimers.get(time);
if (triggers == null) {
getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
triggers = new HashSet<>();
processingTimeTimers.put(time, triggers);
}
+ this.processingTimeTimer = time;
triggers.add(this);
}
@Override
public void registerWatermarkTimer(long time) {
- Set<TriggerContext> triggers = watermarkTimers.get(time);
+ if (watermarkTimer == time) {
+ // we already have set a trigger for that time
+ return;
+ }
+ Set<Context> triggers = watermarkTimers.get(time);
if (triggers == null) {
triggers = new HashSet<>();
watermarkTimers.put(time, triggers);
}
+ this.watermarkTimer = time;
triggers.add(this);
}
+
+ public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+ if (time == processingTimeTimer) {
+ return trigger.onTime(time, this);
+ } else {
+ return Trigger.TriggerResult.CONTINUE;
+ }
+ }
+
+ public Trigger.TriggerResult onEventTime(long time) throws Exception {
+ if (time == watermarkTimer) {
+ return trigger.onTime(time, this);
+ } else {
+ return Trigger.TriggerResult.CONTINUE;
+ }
+ }
}
/**
@@ -274,7 +463,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
@Override
- public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+ public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
if (userFunction instanceof OutputTypeConfigurable) {
@SuppressWarnings("unchecked")
OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
@@ -283,12 +472,59 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
// ------------------------------------------------------------------------
+ // Checkpointing
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+ // we write the panes with the key/value maps into the stream
+ StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+ int numWindows = windows.size();
+ out.writeInt(numWindows);
+ for (Context context: windows.values()) {
+ context.writeToState(out);
+ }
+
+ taskState.setOperatorState(out.closeAndGetHandle());
+ return taskState;
+ }
+
+ @Override
+ public void restoreState(StreamTaskState taskState) throws Exception {
+ super.restoreState(taskState);
+
+
+ @SuppressWarnings("unchecked")
+ StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+ DataInputView in = inputState.getState(getUserCodeClassloader());
+
+ int numWindows = in.readInt();
+ this.windows = new HashMap<>(numWindows);
+ this.processingTimeTimers = new HashMap<>();
+ this.watermarkTimers = new HashMap<>();
+
+ for (int j = 0; j < numWindows; j++) {
+ Context context = new Context(in);
+ windows.put(context.window, context);
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
@VisibleForTesting
- public Trigger<? super IN, ? super W> getTriggerTemplate() {
- return triggerTemplate;
+ public boolean isSetProcessingTime() {
+ return setProcessingTime;
+ }
+
+ @VisibleForTesting
+ public Trigger<? super IN, ? super W> getTrigger() {
+ return trigger;
}
@VisibleForTesting
@@ -300,9 +536,4 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
return windowBufferFactory;
}
-
- @VisibleForTesting
- public boolean isSetProcessingTime() {
- return setProcessingTime;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 82a3f9a..0b3274f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,13 +18,16 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -38,10 +41,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -84,49 +93,77 @@ public class WindowOperator<K, IN, OUT, W extends Window>
private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
+ // ------------------------------------------------------------------------
+ // Configuration values and user functions
+ // ------------------------------------------------------------------------
+
private final WindowAssigner<? super IN, W> windowAssigner;
private final KeySelector<IN, K> keySelector;
- private final Trigger<? super IN, ? super W> triggerTemplate;
+ private final Trigger<? super IN, ? super W> trigger;
private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
/**
- * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
- * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+ * If this is true. The current processing time is set as the timestamp of incoming elements.
+ * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+ * if eviction should happen based on processing time.
+ */
+ private boolean setProcessingTime = false;
+
+ /**
+ * This is used to copy the incoming element because it can be put into several window
+ * buffers.
+ */
+ private TypeSerializer<IN> inputSerializer;
+
+ /**
+ * For serializing the key in checkpoints.
*/
- protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows;
+ private final TypeSerializer<K> keySerializer;
+
+ /**
+ * For serializing the window in checkpoints.
+ */
+ private final TypeSerializer<W> windowSerializer;
+
+ // ------------------------------------------------------------------------
+ // State that is not checkpointed
+ // ------------------------------------------------------------------------
/**
* Processing time timers that are currently in-flight.
*/
- private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+ private transient Map<Long, Set<Context>> processingTimeTimers;
/**
* Current waiting watermark callbacks.
*/
- private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+ private transient Map<Long, Set<Context>> watermarkTimers;
/**
* This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
*/
protected transient TimestampedCollector<OUT> timestampedCollector;
+ // ------------------------------------------------------------------------
+ // State that needs to be checkpointed
+ // ------------------------------------------------------------------------
+
/**
- * If this is true. The current processing time is set as the timestamp of incoming elements.
- * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
- * if eviction should happen based on processing time.
+ * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+ * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
*/
- private boolean setProcessingTime = false;
-
- private TypeSerializer<IN> inputSerializer;
+ protected transient Map<K, Map<W, Context>> windows;
/**
* Creates a new {@code WindowOperator} based on the given policies and user functions.
*/
public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+ TypeSerializer<W> windowSerializer,
KeySelector<IN, K> keySelector,
+ TypeSerializer<K> keySerializer,
WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
WindowFunction<IN, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger) {
@@ -134,27 +171,26 @@ public class WindowOperator<K, IN, OUT, W extends Window>
super(windowFunction);
this.windowAssigner = requireNonNull(windowAssigner);
+ this.windowSerializer = windowSerializer;
this.keySelector = requireNonNull(keySelector);
+ this.keySerializer = requireNonNull(keySerializer);
this.windowBufferFactory = requireNonNull(windowBufferFactory);
- this.triggerTemplate = requireNonNull(trigger);
+ this.trigger = requireNonNull(trigger);
setChainingStrategy(ChainingStrategy.ALWAYS);
-// forceInputCopy();
}
@Override
@SuppressWarnings("unchecked")
- public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+ public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
}
@Override
- public void open() throws Exception {
+ public final void open() throws Exception {
super.open();
- windows = new HashMap<>();
- watermarkTimers = new HashMap<>();
- processingTimeTimers = new HashMap<>();
+
timestampedCollector = new TimestampedCollector<>(output);
if (inputSerializer == null) {
@@ -163,17 +199,53 @@ public class WindowOperator<K, IN, OUT, W extends Window>
windowBufferFactory.setRuntimeContext(getRuntimeContext());
windowBufferFactory.open(getUserFunctionParameters());
+
+
+ // these could already be initialized from restoreState()
+ if (watermarkTimers == null) {
+ watermarkTimers = new HashMap<>();
+ }
+ if (processingTimeTimers == null) {
+ processingTimeTimers = new HashMap<>();
+ }
+ if (windows == null) {
+ windows = new HashMap<>();
+ }
+
+ // re-register timers that this window context had set
+ for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+ Map<W, Context> keyWindows = entry.getValue();
+ for (Context context: keyWindows.values()) {
+ if (context.processingTimeTimer > 0) {
+ Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
+ if (triggers == null) {
+ getRuntimeContext().registerTimer(context.processingTimeTimer, WindowOperator.this);
+ triggers = new HashSet<>();
+ processingTimeTimers.put(context.processingTimeTimer, triggers);
+ }
+ triggers.add(context);
+ }
+ if (context.watermarkTimer > 0) {
+ Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
+ if (triggers == null) {
+ triggers = new HashSet<>();
+ watermarkTimers.put(context.watermarkTimer, triggers);
+ }
+ triggers.add(context);
+ }
+
+ }
+ }
}
@Override
- public void close() throws Exception {
+ public final void close() throws Exception {
super.close();
// emit the elements that we still keep
- for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> entry: windows.entrySet()) {
- K key = entry.getKey();
- Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = entry.getValue();
- for (W window: keyWindows.keySet()) {
- emitWindow(key, window, false);
+ for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+ Map<W, Context> keyWindows = entry.getValue();
+ for (Context window: keyWindows.values()) {
+ emitWindow(window);
}
}
windows.clear();
@@ -182,77 +254,81 @@ public class WindowOperator<K, IN, OUT, W extends Window>
@Override
@SuppressWarnings("unchecked")
- public void processElement(StreamRecord<IN> element) throws Exception {
+ public final void processElement(StreamRecord<IN> element) throws Exception {
if (setProcessingTime) {
element.replace(element.getValue(), System.currentTimeMillis());
}
+
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
K key = keySelector.getKey(element.getValue());
- Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+ Map<W, Context> keyWindows = windows.get(key);
if (keyWindows == null) {
keyWindows = new HashMap<>();
windows.put(key, keyWindows);
}
for (W window: elementWindows) {
- Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = keyWindows.get(window);
- if (bufferAndTrigger == null) {
- bufferAndTrigger = new Tuple2<>();
- bufferAndTrigger.f0 = windowBufferFactory.create();
- bufferAndTrigger.f1 = new TriggerContext(key, window, triggerTemplate.duplicate());
- keyWindows.put(window, bufferAndTrigger);
+ Context context = keyWindows.get(window);
+ if (context == null) {
+ WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
+ context = new Context(key, window, windowBuffer);
+ keyWindows.put(window, context);
}
StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
- bufferAndTrigger.f0.storeElement(elementCopy);
- Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+ context.windowBuffer.storeElement(elementCopy);
+ Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
processTriggerResult(triggerResult, key, window);
}
}
- protected void emitWindow(K key, W window, boolean purge) throws Exception {
- timestampedCollector.setTimestamp(window.getEnd());
-
- Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
-
- if (keyWindows == null) {
- LOG.debug("Window {} for key {} already gone.", window, key);
- return;
- }
-
- Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
- if (purge) {
- bufferAndTrigger = keyWindows.remove(window);
- } else {
- bufferAndTrigger = keyWindows.get(window);
- }
-
- if (bufferAndTrigger == null) {
- LOG.debug("Window {} for key {} already gone.", window, key);
- return;
- }
+ protected void emitWindow(Context context) throws Exception {
+ timestampedCollector.setTimestamp(context.window.maxTimestamp());
-
- userFunction.apply(key,
- window,
- bufferAndTrigger.f0.getUnpackedElements(),
+ userFunction.apply(context.key,
+ context.window,
+ context.windowBuffer.getUnpackedElements(),
timestampedCollector);
-
- if (keyWindows.isEmpty()) {
- windows.remove(key);
- }
}
private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
switch (triggerResult) {
- case FIRE:
- emitWindow(key, window, false);
+ case FIRE: {
+ Map<W, Context> keyWindows = windows.get(key);
+ if (keyWindows == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+ Context context = keyWindows.get(window);
+ if (context == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+
+
+ emitWindow(context);
break;
+ }
- case FIRE_AND_PURGE:
- emitWindow(key, window, true);
+ case FIRE_AND_PURGE: {
+ Map<W, Context> keyWindows = windows.get(key);
+ if (keyWindows == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+ Context context = keyWindows.remove(window);
+ if (context == null) {
+ LOG.debug("Window {} for key {} already gone.", window, key);
+ return;
+ }
+ if (keyWindows.isEmpty()) {
+ windows.remove(key);
+ }
+
+ emitWindow(context);
break;
+ }
case CONTINUE:
// ingore
@@ -260,14 +336,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
+ public final void processWatermark(Watermark mark) throws Exception {
Set<Long> toRemove = new HashSet<>();
- for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+ for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
if (triggers.getKey() <= mark.getTimestamp()) {
- for (TriggerContext trigger: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
- processTriggerResult(triggerResult, trigger.key, trigger.window);
+ for (Context context: triggers.getValue()) {
+ Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
+ processTriggerResult(triggerResult, context.key, context.window);
}
toRemove.add(triggers.getKey());
}
@@ -280,14 +356,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
@Override
- public void trigger(long time) throws Exception {
+ public final void trigger(long time) throws Exception {
Set<Long> toRemove = new HashSet<>();
- for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+ for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
if (triggers.getKey() < time) {
- for (TriggerContext trigger: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
- processTriggerResult(triggerResult, trigger.key, trigger.window);
+ for (Context context: triggers.getValue()) {
+ Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+ processTriggerResult(triggerResult, context.key, context.window);
}
toRemove.add(triggers.getKey());
}
@@ -302,37 +378,146 @@ public class WindowOperator<K, IN, OUT, W extends Window>
* A context object that is given to {@code Trigger} functions to allow them to register
* timer/watermark callbacks.
*/
- protected class TriggerContext implements Trigger.TriggerContext {
- Trigger<? super IN, ? super W> trigger;
- K key;
- W window;
+ protected class Context implements Trigger.TriggerContext {
+ protected K key;
+ protected W window;
+
+ protected WindowBuffer<IN> windowBuffer;
- public TriggerContext(K key, W window, Trigger<? super IN, ? super W> trigger) {
+ protected HashMap<String, Serializable> state;
+
+ // use these to only allow one timer in flight at a time of each type
+ // if the trigger registers another timer this value here will be overwritten,
+ // the timer is not removed from the set of in-flight timers to improve performance.
+ // When a trigger fires it is just checked against the last timer that was set.
+ protected long watermarkTimer;
+ protected long processingTimeTimer;
+
+ public Context(K key,
+ W window,
+ WindowBuffer<IN> windowBuffer) {
this.key = key;
this.window = window;
- this.trigger = trigger;
+ this.windowBuffer = windowBuffer;
+ state = new HashMap<>();
+
+ this.watermarkTimer = -1;
+ this.processingTimeTimer = -1;
+ }
+
+ /**
+ * Constructs a new {@code Context} by reading from a {@link DataInputView} that
+ * contains a serialized context that we wrote in
+ * {@link #writeToState(StateBackend.CheckpointStateOutputView)}
+ */
+ @SuppressWarnings("unchecked")
+ protected Context(DataInputView in) throws Exception {
+ this.key = keySerializer.deserialize(in);
+ this.window = windowSerializer.deserialize(in);
+ this.watermarkTimer = in.readLong();
+ this.processingTimeTimer = in.readLong();
+
+ int stateSize = in.readInt();
+ byte[] stateData = new byte[stateSize];
+ in.read(stateData);
+ ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
+ state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+
+ this.windowBuffer = windowBufferFactory.create();
+ int numElements = in.readInt();
+ MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+ for (int i = 0; i < numElements; i++) {
+ windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+ }
+ }
+
+ /**
+ * Writes the {@code Context} to the given state checkpoint output.
+ */
+ protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+ keySerializer.serialize(key, out);
+ windowSerializer.serialize(window, out);
+ out.writeLong(watermarkTimer);
+ out.writeLong(processingTimeTimer);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ SerializationUtils.serialize(state, baos);
+ out.writeInt(baos.size());
+ out.write(baos.toByteArray(), 0, baos.size());
+
+ MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+ out.writeInt(windowBuffer.size());
+ for (StreamRecord<IN> element: windowBuffer.getElements()) {
+ recordSerializer.serialize(element, out);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
+ return new OperatorState<S>() {
+ @Override
+ public S value() throws IOException {
+ Serializable value = state.get(name);
+ if (value == null) {
+ state.put(name, defaultState);
+ value = defaultState;
+ }
+ return (S) value;
+ }
+
+ @Override
+ public void update(S value) throws IOException {
+ state.put(name, value);
+ }
+ };
}
@Override
public void registerProcessingTimeTimer(long time) {
- Set<TriggerContext> triggers = processingTimeTimers.get(time);
+ if (this.processingTimeTimer == time) {
+ // we already have set a trigger for that time
+ return;
+ }
+ Set<Context> triggers = processingTimeTimers.get(time);
if (triggers == null) {
getRuntimeContext().registerTimer(time, WindowOperator.this);
triggers = new HashSet<>();
processingTimeTimers.put(time, triggers);
}
+ this.processingTimeTimer = time;
triggers.add(this);
}
@Override
public void registerWatermarkTimer(long time) {
- Set<TriggerContext> triggers = watermarkTimers.get(time);
+ if (watermarkTimer == time) {
+ // we already have set a trigger for that time
+ return;
+ }
+ Set<Context> triggers = watermarkTimers.get(time);
if (triggers == null) {
triggers = new HashSet<>();
watermarkTimers.put(time, triggers);
}
+ this.watermarkTimer = time;
triggers.add(this);
}
+
+ public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+ if (time == processingTimeTimer) {
+ return trigger.onTime(time, this);
+ } else {
+ return Trigger.TriggerResult.CONTINUE;
+ }
+ }
+
+ public Trigger.TriggerResult onEventTime(long time) throws Exception {
+ if (time == watermarkTimer) {
+ return trigger.onTime(time, this);
+ } else {
+ return Trigger.TriggerResult.CONTINUE;
+ }
+ }
}
/**
@@ -347,7 +532,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
@Override
- public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+ public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
if (userFunction instanceof OutputTypeConfigurable) {
@SuppressWarnings("unchecked")
OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
@@ -356,6 +541,60 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
// ------------------------------------------------------------------------
+ // Checkpointing
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+ // we write the panes with the key/value maps into the stream
+ StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+ int numKeys = windows.size();
+ out.writeInt(numKeys);
+
+ for (Map.Entry<K, Map<W, Context>> keyWindows: windows.entrySet()) {
+ int numWindows = keyWindows.getValue().size();
+ out.writeInt(numWindows);
+ for (Context context: keyWindows.getValue().values()) {
+ context.writeToState(out);
+ }
+ }
+
+ taskState.setOperatorState(out.closeAndGetHandle());
+ return taskState;
+ }
+
+ @Override
+ public void restoreState(StreamTaskState taskState) throws Exception {
+ super.restoreState(taskState);
+
+
+ @SuppressWarnings("unchecked")
+ StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+ DataInputView in = inputState.getState(getUserCodeClassloader());
+
+ int numKeys = in.readInt();
+ this.windows = new HashMap<>(numKeys);
+ this.processingTimeTimers = new HashMap<>();
+ this.watermarkTimers = new HashMap<>();
+
+ for (int i = 0; i < numKeys; i++) {
+ int numWindows = in.readInt();
+ for (int j = 0; j < numWindows; j++) {
+ Context context = new Context(in);
+ Map<W, Context> keyWindows = windows.get(context.key);
+ if (keyWindows == null) {
+ keyWindows = new HashMap<>(numWindows);
+ windows.put(context.key, keyWindows);
+ }
+ keyWindows.put(context.window, context);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
@@ -365,8 +604,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
@VisibleForTesting
- public Trigger<? super IN, ? super W> getTriggerTemplate() {
- return triggerTemplate;
+ public Trigger<? super IN, ? super W> getTrigger() {
+ return trigger;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 4fa16ac..45ef29f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
@@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
@@ -119,7 +119,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
Assert.assertTrue(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
@@ -143,7 +143,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
Assert.assertTrue(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
@@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
@@ -194,7 +194,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 3139941..39033cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -54,6 +54,7 @@ public class EvictingNonKeyedWindowOperatorTest {
EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 3d9605e..afc65d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing;
+import jdk.nashorn.internal.objects.Global;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -54,7 +56,9 @@ public class EvictingWindowOperatorTest {
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 6cc8931..a91d957 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -76,6 +76,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
windowBufferFactory,
new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
@@ -156,6 +157,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
windowBufferFactory,
new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
@@ -234,6 +236,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
windowBufferFactory,
new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -312,6 +315,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
windowBufferFactory,
new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index d387df0..e825b88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -76,7 +77,9 @@ public class WindowOperatorTest {
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowBufferFactory,
new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
@@ -163,7 +166,9 @@ public class WindowOperatorTest {
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowBufferFactory,
new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
@@ -246,7 +251,9 @@ public class WindowOperatorTest {
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowBufferFactory,
new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -331,7 +338,9 @@ public class WindowOperatorTest {
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
+ new GlobalWindow.Serializer(),
new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowBufferFactory,
new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 10fe734..02ec820 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -116,7 +116,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
@@ -140,7 +140,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
@@ -166,7 +166,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
@@ -191,7 +191,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertTrue(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
@@ -217,7 +217,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
@@ -244,7 +244,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 950b0f5..60b7894 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.examples.windowing;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -97,8 +98,7 @@ public class SessionWindowing {
private static final long serialVersionUID = 1L;
- private volatile Long lastSeenEvent = 1L;
- private Long sessionTimeout;
+ private final Long sessionTimeout;
public SessionTrigger(Long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
@@ -106,13 +106,17 @@ public class SessionWindowing {
}
@Override
- public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) {
- Long timeSinceLastEvent = timestamp - lastSeenEvent;
+ public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
+
+ OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+ Long lastSeen = lastSeenState.value();
+
+ Long timeSinceLastEvent = timestamp - lastSeen;
// Update the last seen event time
- lastSeenEvent = timestamp;
+ lastSeenState.update(timestamp);
- ctx.registerWatermarkTimer(lastSeenEvent + sessionTimeout);
+ ctx.registerWatermarkTimer(lastSeen + sessionTimeout);
if (timeSinceLastEvent > sessionTimeout) {
return TriggerResult.FIRE_AND_PURGE;
@@ -122,17 +126,15 @@ public class SessionWindowing {
}
@Override
- public TriggerResult onTime(long time, TriggerContext ctx) {
- if (time - lastSeenEvent >= sessionTimeout) {
+ public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
+ OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+ Long lastSeen = lastSeenState.value();
+
+ if (time - lastSeen >= sessionTimeout) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
-
- @Override
- public SessionTrigger duplicate() {
- return new SessionTrigger(sessionTimeout);
- }
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 33104ab..0357144 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -202,6 +202,58 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ def apply[R: TypeInformation: ClassTag](
+ preAggregator: ReduceFunction[T],
+ function: AllWindowFunction[T, R, W]): DataStream[R] = {
+ javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ def apply[R: TypeInformation: ClassTag](
+ preAggregator: (T, T) => T,
+ function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+ if (function == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ if (function == null) {
+ throw new NullPointerException("WindowApply function must not be null.")
+ }
+
+ val cleanReducer = clean(preAggregator)
+ val reducer = new ReduceFunction[T] {
+ def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+ }
+
+ val cleanApply = clean(function)
+ val applyFunction = new AllWindowFunction[T, R, W] {
+ def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+ cleanApply(window, elements.asScala, out)
+ }
+ }
+ javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+ }
+
// ------------------------------------------------------------------------
// Aggregations on the keyed windows
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index d4f4618..93b91ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -196,6 +196,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
*/
def apply[R: TypeInformation: ClassTag](
function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+ if (function == null) {
+ throw new NullPointerException("WindowApply function must not be null.")
+ }
+
val cleanedFunction = clean(function)
val applyFunction = new WindowFunction[T, R, K, W] {
def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
@@ -205,6 +209,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ def apply[R: TypeInformation: ClassTag](
+ preAggregator: ReduceFunction[T],
+ function: WindowFunction[T, R, K, W]): DataStream[R] = {
+ javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ def apply[R: TypeInformation: ClassTag](
+ preAggregator: (T, T) => T,
+ function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+ if (function == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ if (function == null) {
+ throw new NullPointerException("WindowApply function must not be null.")
+ }
+
+ val cleanReducer = clean(preAggregator)
+ val reducer = new ReduceFunction[T] {
+ def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+ }
+
+ val cleanApply = clean(function)
+ val applyFunction = new WindowFunction[T, R, K, W] {
+ def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+ cleanApply(key, window, elements.asScala, out)
+ }
+ }
+ javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+ }
+
// ------------------------------------------------------------------------
// Aggregations on the keyed windows
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 99fcd07..7da7bc3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -22,8 +22,9 @@ package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
@@ -111,7 +112,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -134,7 +135,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
@@ -161,7 +162,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -185,11 +186,72 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
+
+ @Test
+ def testPreReduce(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val reducer = new DummyReducer
+
+ val window1 = source
+ .keyBy(0)
+ .window(SlidingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
+ .trigger(CountTrigger.of(100))
+ .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+ def apply(
+ tuple: Tuple,
+ window: TimeWindow,
+ values: java.lang.Iterable[(String, Int)],
+ out: Collector[(String, Int)]) { }
+ })
+
+ val transform1 = window1.getJavaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator1 = transform1.getOperator
+
+ assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+ val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+ assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+ assertTrue(
+ winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+ val window2 = source
+ .keyBy(0)
+ .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(100))
+ .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+ def apply(
+ tuple: Tuple,
+ window: TimeWindow,
+ values: java.lang.Iterable[(String, Int)],
+ out: Collector[(String, Int)]) { }
+ })
+
+ val transform2 = window2.getJavaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator2 = transform2.getOperator
+
+ assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+ val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+ assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+ assertTrue(
+ winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+ }
+
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 65f978c..46981ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -108,7 +108,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -133,7 +133,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
@@ -161,7 +161,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
- assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -187,9 +187,69 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
- assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
+
+ @Test
+ def testPreReduce(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val reducer = new DummyReducer
+
+ val window1 = source
+ .keyBy(0)
+ .window(SlidingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
+ .trigger(CountTrigger.of(100))
+ .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+ def apply(
+ tuple: Tuple,
+ window: TimeWindow,
+ values: java.lang.Iterable[(String, Int)],
+ out: Collector[(String, Int)]) { }
+ })
+
+ val transform1 = window1.getJavaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator1 = transform1.getOperator
+
+ assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+ val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+ assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+ assertTrue(
+ winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+ val window2 = source
+ .keyBy(0)
+ .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(100))
+ .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+ def apply(
+ tuple: Tuple,
+ window: TimeWindow,
+ values: java.lang.Iterable[(String, Int)],
+ out: Collector[(String, Int)]) { }
+ })
+
+ val transform2 = window2.getJavaStream.getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator2 = transform2.getOperator
+
+ assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+ val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+ assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+ assertTrue(
+ winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+ }
}