You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/22 16:01:53 UTC

flink git commit: [FLINK-2891] Set KV-State key upon Window Evaluation in General Windows

Repository: flink
Updated Branches:
  refs/heads/master 56146ae85 -> 201f55b85


[FLINK-2891] Set KV-State key upon Window Evaluation in General Windows

Before, this was not set, leading to incorrect results if KV-State was
used in the WindowFunction.

This also adds a test.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/201f55b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/201f55b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/201f55b8

Branch: refs/heads/master
Commit: 201f55b85fd89e10c36a697a7e895fdbd02e53a1
Parents: 56146ae
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 22 11:22:03 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Oct 22 15:53:57 2015 +0200

----------------------------------------------------------------------
 .../windowing/NonKeyedWindowOperator.java       |  10 +-
 .../operators/windowing/WindowOperator.java     |  13 +-
 .../EventTimeWindowCheckpointingITCase.java     | 149 +++++++++++++++++++
 3 files changed, 164 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/201f55b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 5de6cd1..101c818 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -241,10 +241,12 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	protected void emitWindow(Context context) throws Exception {
 		timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
-		userFunction.apply(
-				context.window,
-				context.windowBuffer.getUnpackedElements(),
-				timestampedCollector);
+		if (context.windowBuffer.size() > 0) {
+			userFunction.apply(
+					context.window,
+					context.windowBuffer.getUnpackedElements(),
+					timestampedCollector);
+		}
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/201f55b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2491c57..04c393c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -286,10 +286,15 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	protected void emitWindow(Context context) throws Exception {
 		timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
-		userFunction.apply(context.key,
-				context.window,
-				context.windowBuffer.getUnpackedElements(),
-				timestampedCollector);
+
+		if (context.windowBuffer.size() > 0) {
+			setKeyContextElement(context.windowBuffer.getElements().iterator().next());
+
+			userFunction.apply(context.key,
+					context.window,
+					context.windowBuffer.getUnpackedElements(),
+					timestampedCollector);
+		}
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/201f55b8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 6cf04f5..975582b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -147,6 +148,72 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 	}
 
 	@Test
+	public void testTumblingTimeWindowWithKVState() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 100;
+		final int NUM_KEYS = 100;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						private OperatorState<Integer> count;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+							count = getRuntimeContext().getKeyValueState("count", Integer.class, 0);
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
+
+							// the window count state starts with the key, so that we get
+							// different count results for each key
+							if (count.value() == 0) {
+								count.update(tuple.<Long>getField(0).intValue());
+							}
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							count.update(count.value() + 1);
+							out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
+						}
+					})
+					.addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testSlidingTimeWindow() {
 		final int NUM_ELEMENTS_PER_KEY = 3000;
 		final int WINDOW_SIZE = 1000;
@@ -560,6 +627,88 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		}
 	}
 
+	// Sink for validating the stateful window counts
+	private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+			implements Checkpointed<HashMap<Long, Integer>> {
+
+		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+		private final int numKeys;
+		private final int numWindowsExpected;
+
+		private CountValidatingSink(int numKeys, int numWindowsExpected) {
+			this.numKeys = numKeys;
+			this.numWindowsExpected = numWindowsExpected;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() throws Exception {
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					}
+				}
+			}
+			assertTrue("The source must see all expected windows.", seenAll);
+		}
+
+		@Override
+		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+			Integer curr = windowCounts.get(value.f0);
+			if (curr != null) {
+				windowCounts.put(value.f0, curr + 1);
+			}
+			else {
+				windowCounts.put(value.f0, 1);
+			}
+
+
+			// verify the contents of that window, the contents should be:
+			// (key + num windows so far)
+
+			assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
+
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					} else if (windowCount > numWindowsExpected) {
+						fail("Window count to high: " + windowCount);
+					}
+				}
+
+				if (seenAll) {
+					// exit
+					throw new SuccessException();
+				}
+
+			}
+		}
+
+		@Override
+		public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+			return this.windowCounts;
+		}
+
+		@Override
+		public void restoreState(HashMap<Long, Integer> state) {
+			this.windowCounts.putAll(state);
+		}
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------