You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/10/04 11:02:12 UTC
flink git commit: [FLINK-2802] [streaming] Remove cyclic watermark
dependencies for iterations
Repository: flink
Updated Branches:
refs/heads/master 6e0e67d2e -> 88a977689
[FLINK-2802] [streaming] Remove cyclic watermark dependencies for iterations
Closes #1216
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88a97768
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88a97768
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88a97768
Branch: refs/heads/master
Commit: 88a977689aeec67a2e67eb0a1449abef407ed6ec
Parents: 6e0e67d
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Oct 2 19:57:01 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Sun Oct 4 09:33:02 2015 +0200
----------------------------------------------------------------------
.../runtime/io/StreamInputProcessor.java | 3 +-
.../runtime/tasks/StreamIterationHead.java | 9 +++-
.../api/streamtask/StreamIterationHeadTest.java | 51 ++++++++++++++++++++
.../runtime/tasks/StreamMockEnvironment.java | 4 +-
4 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index f50ddcd..80563b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -79,7 +79,7 @@ public class StreamInputProcessor<IN> {
private final DeserializationDelegate<StreamElement> deserializationDelegate;
- @SuppressWarnings({"unchecked", "rawtypes"})
+ @SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
EventListener<CheckpointBarrier> checkpointListener,
CheckpointingMode checkpointMode,
@@ -125,7 +125,6 @@ public class StreamInputProcessor<IN> {
lastEmittedWatermark = Long.MIN_VALUE;
}
- @SuppressWarnings("unchecked")
public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, Object lock) throws Exception {
if (isFinished) {
return false;
http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 2ad2d2d..c937e51 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -23,10 +23,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +64,13 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
Collection<RecordWriterOutput<OUT>> outputs =
(Collection<RecordWriterOutput<OUT>>) (Collection<?>) outputHandler.getOutputs();
+ // If timestamps are enabled we make sure to remove cyclic watermark dependencies
+ if (getExecutionConfig().areTimestampsEnabled()) {
+ for (RecordWriterOutput<OUT> output : outputs) {
+ output.emitWatermark(new Watermark(Long.MAX_VALUE));
+ }
+ }
+
while (running) {
StreamRecord<OUT> nextRecord = shouldWait ?
dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
new file mode 100644
index 0000000..8f5f8df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamtask;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ResultPartitionWriter.class })
+public class StreamIterationHeadTest {
+
+ @Test
+ public void testIterationHeadWatermarkEmission() throws Exception {
+ StreamIterationHead<Integer> head = new StreamIterationHead<>();
+ StreamTaskTestHarness<Integer> harness = new StreamTaskTestHarness<>(head,
+ BasicTypeInfo.INT_TYPE_INFO);
+ harness.getStreamConfig().setIterationId("1");
+ harness.getStreamConfig().setIterationWaitTime(1);
+
+ harness.invoke();
+ harness.waitForTaskCompletion();
+
+ assertEquals(1, harness.getOutput().size());
+ assertEquals(new Watermark(Long.MAX_VALUE), harness.getOutput().peek());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/88a97768/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 4fec118..090f7cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -228,12 +228,12 @@ public class StreamMockEnvironment implements Environment {
@Override
public String getTaskName() {
- return null;
+ return "";
}
@Override
public String getTaskNameWithSubtasks() {
- return null;
+ return "";
}
@Override