You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/10/04 11:02:12 UTC

flink git commit: [FLINK-2802] [streaming] Remove cyclic watermark dependencies for iterations

Repository: flink
Updated Branches:
  refs/heads/master 6e0e67d2e -> 88a977689


[FLINK-2802] [streaming] Remove cyclic watermark dependencies for iterations

Closes #1216


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88a97768
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88a97768
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88a97768

Branch: refs/heads/master
Commit: 88a977689aeec67a2e67eb0a1449abef407ed6ec
Parents: 6e0e67d
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Oct 2 19:57:01 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Sun Oct 4 09:33:02 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/StreamInputProcessor.java        |  3 +-
 .../runtime/tasks/StreamIterationHead.java      |  9 +++-
 .../api/streamtask/StreamIterationHeadTest.java | 51 ++++++++++++++++++++
 .../runtime/tasks/StreamMockEnvironment.java    |  4 +-
 4 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index f50ddcd..80563b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -79,7 +79,7 @@ public class StreamInputProcessor<IN> {
 
 	private final DeserializationDelegate<StreamElement> deserializationDelegate;
 
-	@SuppressWarnings({"unchecked", "rawtypes"})
+	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
 								EventListener<CheckpointBarrier> checkpointListener,
 								CheckpointingMode checkpointMode,
@@ -125,7 +125,6 @@ public class StreamInputProcessor<IN> {
 		lastEmittedWatermark = Long.MIN_VALUE;
 	}
 
-	@SuppressWarnings("unchecked")
 	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, Object lock) throws Exception {
 		if (isFinished) {
 			return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 2ad2d2d..c937e51 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -23,10 +23,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +64,13 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 			Collection<RecordWriterOutput<OUT>> outputs = 
 					(Collection<RecordWriterOutput<OUT>>) (Collection<?>) outputHandler.getOutputs();
 
+			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
+			if (getExecutionConfig().areTimestampsEnabled()) {
+				for (RecordWriterOutput<OUT> output : outputs) {
+					output.emitWatermark(new Watermark(Long.MAX_VALUE));
+				}
+			}
+
 			while (running) {
 				StreamRecord<OUT> nextRecord = shouldWait ?
 					dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :

http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
new file mode 100644
index 0000000..8f5f8df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamtask;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ResultPartitionWriter.class })
+public class StreamIterationHeadTest {
+
+	@Test
+	public void testIterationHeadWatermarkEmission() throws Exception {
+		StreamIterationHead<Integer> head = new StreamIterationHead<>();
+		StreamTaskTestHarness<Integer> harness = new StreamTaskTestHarness<>(head,
+				BasicTypeInfo.INT_TYPE_INFO);
+		harness.getStreamConfig().setIterationId("1");
+		harness.getStreamConfig().setIterationWaitTime(1);
+
+		harness.invoke();
+		harness.waitForTaskCompletion();
+
+		assertEquals(1, harness.getOutput().size());
+		assertEquals(new Watermark(Long.MAX_VALUE), harness.getOutput().peek());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 4fec118..090f7cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -228,12 +228,12 @@ public class StreamMockEnvironment implements Environment {
 
 	@Override
 	public String getTaskName() {
-		return null;
+		return "";
 	}
 
 	@Override
 	public String getTaskNameWithSubtasks() {
-		return null;
+		return "";
 	}
 
 	@Override