You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/22 16:01:53 UTC
flink git commit: [FLINK-2891] Set KV-State key upon Window
Evaluation in General Windows
Repository: flink
Updated Branches:
refs/heads/master 56146ae85 -> 201f55b85
[FLINK-2891] Set KV-State key upon Window Evaluation in General Windows
Before, this was not set, leading to incorrect results if KV-State was
used in the WindowFunction.
This also adds a test.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/201f55b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/201f55b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/201f55b8
Branch: refs/heads/master
Commit: 201f55b85fd89e10c36a697a7e895fdbd02e53a1
Parents: 56146ae
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 22 11:22:03 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Oct 22 15:53:57 2015 +0200
----------------------------------------------------------------------
.../windowing/NonKeyedWindowOperator.java | 10 +-
.../operators/windowing/WindowOperator.java | 13 +-
.../EventTimeWindowCheckpointingITCase.java | 149 +++++++++++++++++++
3 files changed, 164 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/201f55b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 5de6cd1..101c818 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -241,10 +241,12 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
protected void emitWindow(Context context) throws Exception {
timestampedCollector.setTimestamp(context.window.maxTimestamp());
- userFunction.apply(
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
+ if (context.windowBuffer.size() > 0) {
+ userFunction.apply(
+ context.window,
+ context.windowBuffer.getUnpackedElements(),
+ timestampedCollector);
+ }
}
private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/201f55b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2491c57..04c393c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -286,10 +286,15 @@ public class WindowOperator<K, IN, OUT, W extends Window>
protected void emitWindow(Context context) throws Exception {
timestampedCollector.setTimestamp(context.window.maxTimestamp());
- userFunction.apply(context.key,
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
+
+ if (context.windowBuffer.size() > 0) {
+ setKeyContextElement(context.windowBuffer.getElements().iterator().next());
+
+ userFunction.apply(context.key,
+ context.window,
+ context.windowBuffer.getUnpackedElements(),
+ timestampedCollector);
+ }
}
private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/201f55b8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 6cf04f5..975582b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
@@ -147,6 +148,72 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
}
@Test
+ public void testTumblingTimeWindowWithKVState() {
+ final int NUM_ELEMENTS_PER_KEY = 3000;
+ final int WINDOW_SIZE = 100;
+ final int NUM_KEYS = 100;
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getLeaderRPCPort());
+
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ env
+ .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ private OperatorState<Integer> count;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ count = getRuntimeContext().getKeyValueState("count", Integer.class, 0);
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
+
+ // the window count state starts with the key, so that we get
+ // different count results for each key
+ if (count.value() == 0) {
+ count.update(tuple.<Long>getField(0).intValue());
+ }
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ count.update(count.value() + 1);
+ out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
+ }
+ })
+ .addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void testSlidingTimeWindow() {
final int NUM_ELEMENTS_PER_KEY = 3000;
final int WINDOW_SIZE = 1000;
@@ -560,6 +627,88 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
}
}
+ // Sink for validating the stateful window counts
+ private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements Checkpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private CountValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The source must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+
+ // verify the contents of that window, the contents should be:
+ // (key + num windows so far)
+
+ assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
+
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return this.windowCounts;
+ }
+
+ @Override
+ public void restoreState(HashMap<Long, Integer> state) {
+ this.windowCounts.putAll(state);
+ }
+ }
+
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------