You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/16 13:52:50 UTC

[flink] branch release-1.10 updated: [FLINK-19109][streaming] Disallow chaining of ContinuousFileReaderOperator.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 05ff71c  [FLINK-19109][streaming] Disallow chaining of ContinuousFileReaderOperator.
05ff71c is described below

commit 05ff71c813650f65604d960978f93ba82f09a48d
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Sep 11 19:19:30 2020 +0200

    [FLINK-19109][streaming] Disallow chaining of ContinuousFileReaderOperator.
    
    When CFRO is not actively reading from a split, its inside some poll loop where no mail is being processed. Hence, downstream operators cannot rely on processing timers among other things being reliably triggered.
---
 docs/release-notes/flink-1.10.md                     |  6 ++++++
 .../source/ContinuousFileReaderOperator.java         |  2 ++
 .../api/graph/StreamingJobGraphGeneratorTest.java    | 20 ++++++++++++++++++++
 3 files changed, 28 insertions(+)

diff --git a/docs/release-notes/flink-1.10.md b/docs/release-notes/flink-1.10.md
index 022e0d9..c205002 100644
--- a/docs/release-notes/flink-1.10.md
+++ b/docs/release-notes/flink-1.10.md
@@ -468,3 +468,9 @@ deprecated in favor of `RocksDBOptionsFactory` and
 #### Incompatibility of serialized JobGraphs ([FLINK-14594](https://issues.apache.org/jira/browse/FLINK-14594))
 Serialized `JobGraphs` which set the `ResourceSpec` created by Flink versions < `1.10` are no longer compatible with Flink >= `1.10`. 
 If you want to migrate these jobs to Flink >= `1.10` you will have to stop the job with a savepoint and then resume it from this savepoint on the Flink >= `1.10` cluster.
+
+#### Disabled chaining of file reading through ContinuousFileReaderOperator (1.10.3).
+     
+Any `readFile` or `readTextFile` of `DataStream` creates a `ContinuousFileReaderOperator` that used to be chained to subsequent operators.
+However, chained operator do not trigger processing time timers correctly, leading to bugs in watermark assigners (FLINK-19109). As a workaround,
+chaining of `ContinuousFileReaderOperator` is disabled. Flink 1.11.2 and later fixes the underlying problem and re-allows chaining. 
\ No newline at end of file
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index f55141d..4c4de2c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
@@ -80,6 +81,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 
 	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
 		this.format = checkNotNull(format);
+		setChainingStrategy(ChainingStrategy.NEVER);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index cf76075..4ee6416 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -888,6 +889,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		return env.getStreamGraph();
 	}
 
+	/**
+	 * Verifies fix for FLINK-19109, where WatermarkGenerator cannot be chained to ContinuousFileReaderOperator in event
+	 * time.
+	 */
+	@Test
+	public void testContinuousFileReaderOperatorNotChained() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// set parallelism to 2 to avoid chaining with source in case when available processors is 1.
+		env.setParallelism(2);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		env.readTextFile("file:///dummy").print();
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		assertEquals(3, verticesSorted.size());
+	}
+
 	private void assertSameSlotSharingGroup(JobVertex... vertices) {
 		for (int i = 0; i < vertices.length - 1; i++) {
 			assertEquals(vertices[i].getSlotSharingGroup(), vertices[i + 1].getSlotSharingGroup());