You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/16 13:52:50 UTC
[flink] branch release-1.10 updated: [FLINK-19109][streaming]
Disallow chaining of ContinuousFileReaderOperator.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 05ff71c [FLINK-19109][streaming] Disallow chaining of ContinuousFileReaderOperator.
05ff71c is described below
commit 05ff71c813650f65604d960978f93ba82f09a48d
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Sep 11 19:19:30 2020 +0200
[FLINK-19109][streaming] Disallow chaining of ContinuousFileReaderOperator.
When CFRO is not actively reading from a split, its inside some poll loop where no mail is being processed. Hence, downstream operators cannot rely on processing timers among other things being reliably triggered.
---
docs/release-notes/flink-1.10.md | 6 ++++++
.../source/ContinuousFileReaderOperator.java | 2 ++
.../api/graph/StreamingJobGraphGeneratorTest.java | 20 ++++++++++++++++++++
3 files changed, 28 insertions(+)
diff --git a/docs/release-notes/flink-1.10.md b/docs/release-notes/flink-1.10.md
index 022e0d9..c205002 100644
--- a/docs/release-notes/flink-1.10.md
+++ b/docs/release-notes/flink-1.10.md
@@ -468,3 +468,9 @@ deprecated in favor of `RocksDBOptionsFactory` and
#### Incompatibility of serialized JobGraphs ([FLINK-14594](https://issues.apache.org/jira/browse/FLINK-14594))
Serialized `JobGraphs` which set the `ResourceSpec` created by Flink versions < `1.10` are no longer compatible with Flink >= `1.10`.
If you want to migrate these jobs to Flink >= `1.10` you will have to stop the job with a savepoint and then resume it from this savepoint on the Flink >= `1.10` cluster.
+
+#### Disabled chaining of file reading through ContinuousFileReaderOperator (1.10.3).
+
+Any `readFile` or `readTextFile` of `DataStream` creates a `ContinuousFileReaderOperator` that used to be chained to subsequent operators.
+However, chained operator do not trigger processing time timers correctly, leading to bugs in watermark assigners (FLINK-19109). As a workaround,
+chaining of `ContinuousFileReaderOperator` is disabled. Flink 1.11.2 and later fixes the underlying problem and re-allows chaining.
\ No newline at end of file
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index f55141d..4c4de2c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
@@ -80,6 +81,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
this.format = checkNotNull(format);
+ setChainingStrategy(ChainingStrategy.NEVER);
}
@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index cf76075..4ee6416 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -888,6 +889,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
return env.getStreamGraph();
}
+ /**
+ * Verifies fix for FLINK-19109, where WatermarkGenerator cannot be chained to ContinuousFileReaderOperator in event
+ * time.
+ */
+ @Test
+ public void testContinuousFileReaderOperatorNotChained() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // set parallelism to 2 to avoid chaining with source in case when available processors is 1.
+ env.setParallelism(2);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ env.readTextFile("file:///dummy").print();
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+ assertEquals(3, verticesSorted.size());
+ }
+
private void assertSameSlotSharingGroup(JobVertex... vertices) {
for (int i = 0; i < vertices.length - 1; i++) {
assertEquals(vertices[i].getSlotSharingGroup(), vertices[i + 1].getSlotSharingGroup());