You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/01 10:13:46 UTC

[flink] branch release-1.9 updated (9a151d3 -> c44914e)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9a151d3  [hotfix][e2e] Harden test_stateful_stream_job_upgrade e2e test.
     new 7fdb2a9  [FLINK-13376][datastream] ContinuousFileReaderOperator should respect semantics of BoundedOneInput
     new c44914e  [FLINK-13376][datastream] Unify all no operation sinks with DiscardingSink

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../source/ContinuousFileReaderOperator.java       | 16 +++-
 .../StreamingJobGraphGeneratorNodeHashTest.java    | 42 ++++------
 .../jar/CheckpointedStreamingProgram.java          | 10 +--
 .../jar/StreamingCustomInputSplitProgram.java      | 10 +--
 .../test/classloading/jar/StreamingProgram.java    | 10 +--
 .../api/ContinuousFileReaderOperatorITCase.java    | 98 ++++++++++++++++++++++
 .../streaming/runtime/GlobalAggregateITCase.java   | 17 +---
 7 files changed, 137 insertions(+), 66 deletions(-)
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java


[flink] 02/02: [FLINK-13376][datastream] Unify all no operation sinks with DiscardingSink

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c44914e5479941225eda54193697ed7208601db8
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Tue Jul 30 17:35:06 2019 +0800

    [FLINK-13376][datastream] Unify all no operation sinks with DiscardingSink
---
 .../StreamingJobGraphGeneratorNodeHashTest.java    | 42 +++++++++-------------
 .../jar/CheckpointedStreamingProgram.java          | 10 ++----
 .../jar/StreamingCustomInputSplitProgram.java      | 10 ++----
 .../test/classloading/jar/StreamingProgram.java    | 10 ++----
 .../api/ContinuousFileReaderOperatorITCase.java    |  6 ++--
 .../streaming/runtime/GlobalAggregateITCase.java   | 17 ++-------
 6 files changed, 27 insertions(+), 68 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 3a44c17..92a67c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
@@ -94,7 +93,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		src0.map(new NoOpMapFunction())
 				.union(src1, src2)
-				.addSink(new NoOpSinkFunction()).name("sink");
+				.addSink(new DiscardingSink<>()).name("sink");
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -121,7 +120,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		src0.map(new NoOpMapFunction())
 				.union(src1, src2)
-				.addSink(new NoOpSinkFunction()).name("sink");
+				.addSink(new DiscardingSink<>()).name("sink");
 
 		jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -146,7 +145,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		DataStream<String> src0 = env.addSource(new NoOpSourceFunction());
 		DataStream<String> src1 = env.addSource(new NoOpSourceFunction());
 
-		src0.union(src1).addSink(new NoOpSinkFunction());
+		src0.union(src1).addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -178,7 +177,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		env.addSource(new NoOpSourceFunction())
 				.map(new NoOpMapFunction())
 				.filter(new NoOpFilterFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -192,7 +191,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.map(new NoOpMapFunction())
 				.startNewChain()
 				.filter(new NoOpFilterFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -221,7 +220,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.map(new NoOpMapFunction()).name("map")
 				.startNewChain()
 				.filter(new NoOpFilterFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -237,7 +236,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.startNewChain()
 				.filter(new NoOpFilterFunction())
 				.startNewChain()
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -266,9 +265,9 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		DataStream<String> src = env.addSource(new NoOpSourceFunction());
 
-		src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
+		src.map(new NoOpMapFunction()).addSink(new DiscardingSink<>());
 
-		src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
+		src.map(new NoOpMapFunction()).addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 		Set<JobVertexID> vertexIds = new HashSet<>();
@@ -326,11 +325,11 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.name("source").uid("source");
 
 		src.map(new NoOpMapFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink0").uid("sink0");
 
 		src.map(new NoOpMapFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink1").uid("sink1");
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -352,13 +351,13 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		src.map(new NoOpMapFunction())
 				.keyBy(new NoOpKeySelector())
 				.reduce(new NoOpReduceFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink0").uid("sink0");
 
 		src.map(new NoOpMapFunction())
 				.keyBy(new NoOpKeySelector())
 				.reduce(new NoOpReduceFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink1").uid("sink1");
 
 		JobGraph newJobGraph = env.getStreamGraph().getJobGraph();
@@ -386,7 +385,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		env.addSource(new NoOpSourceFunction()).uid("source")
 				.map(new NoOpMapFunction()).uid("source") // Collision
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		// This call is necessary to generate the job graph
 		env.getStreamGraph().getJobGraph();
@@ -403,7 +402,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		env.addSource(new NoOpSourceFunction())
 				// Intermediate chained node
 				.map(new NoOpMapFunction()).uid("map")
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		env.getStreamGraph().getJobGraph();
 	}
@@ -418,7 +417,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		env.addSource(new NoOpSourceFunction()).uid("source")
 				.map(new NoOpMapFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		env.getStreamGraph().getJobGraph();
 	}
@@ -547,15 +546,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		}
 	}
 
-	private static class NoOpSinkFunction implements SinkFunction<String> {
-
-		private static final long serialVersionUID = -5654199886203297279L;
-
-		@Override
-		public void invoke(String value) throws Exception {
-		}
-	}
-
 	private static class NoOpMapFunction implements MapFunction<String, String> {
 
 		private static final long serialVersionUID = 6584823409744624276L;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 0503c93..6fe2e5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.util.Collections;
@@ -48,7 +48,7 @@ public class CheckpointedStreamingProgram {
 		env.disableOperatorChaining();
 
 		DataStream<String> text = env.addSource(new SimpleStringGenerator());
-		text.map(new StatefulMapper()).addSink(new NoOpSink());
+		text.map(new StatefulMapper()).addSink(new DiscardingSink<>());
 		env.setParallelism(1);
 		env.execute("Checkpointed Streaming Program");
 	}
@@ -133,10 +133,4 @@ public class CheckpointedStreamingProgram {
 	private static class SuccessException extends Exception {
 
 	}
-
-	private static class NoOpSink implements SinkFunction<String>{
-		@Override
-		public void invoke(String value) throws Exception {
-		}
-	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 26fe96a..0904544 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -60,7 +60,7 @@ public class StreamingCustomInputSplitProgram {
 			public Tuple2<Integer, Double> map(Integer value) throws Exception {
 				return new Tuple2<Integer, Double>(value, value * 0.5);
 			}
-		}).addSink(new NoOpSink());
+		}).addSink(new DiscardingSink<>());
 
 		env.execute();
 	}
@@ -167,10 +167,4 @@ public class StreamingCustomInputSplitProgram {
 			}
 		}
 	}
-
-	private static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
-		@Override
-		public void invoke(Tuple2<Integer, Double> value) throws Exception {
-		}
-	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 596e4dd..b2414a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.classloading.jar;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.util.Collector;
 
@@ -42,7 +42,7 @@ public class StreamingProgram {
 		DataStream<Word> counts =
 				text.flatMap(new Tokenizer()).keyBy("word").sum("frequency");
 
-		counts.addSink(new NoOpSink());
+		counts.addSink(new DiscardingSink<>());
 
 		env.execute();
 	}
@@ -95,10 +95,4 @@ public class StreamingProgram {
 			}
 		}
 	}
-
-	private static class NoOpSink implements SinkFunction<Word>{
-		@Override
-		public void invoke(Word value) throws Exception {
-		}
-	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
index b681114..8d9995f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.streaming.api;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -61,7 +61,7 @@ public class ContinuousFileReaderOperatorITCase extends AbstractTestBase {
 		TestBoundedOneInputStreamOperator checkingOperator = new TestBoundedOneInputStreamOperator(elementCount);
 		DataStream<String> endInputChecking = source.transform("EndInputChecking", STRING_TYPE_INFO, checkingOperator);
 
-		endInputChecking.addSink(new NoOpSink());
+		endInputChecking.addSink(new DiscardingSink<>());
 
 		env.execute("ContinuousFileReaderOperatorITCase.testEndInput");
 	}
@@ -95,6 +95,4 @@ public class ContinuousFileReaderOperatorITCase extends AbstractTestBase {
 			processedElementCount++;
 		}
 	}
-
-	private static class NoOpSink implements SinkFunction<String> {}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
index 5e2bb7d..0266f9e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -45,7 +45,7 @@ public class GlobalAggregateITCase extends AbstractTestBase {
 
 		streamExecutionEnvironment
 			.addSource(new TestSourceFunction(new IntegerAggregateFunction(), false))
-			.addSink(new NoOpSinkFunction());
+			.addSink(new DiscardingSink<>());
 
 		streamExecutionEnvironment.execute();
 	}
@@ -56,7 +56,7 @@ public class GlobalAggregateITCase extends AbstractTestBase {
 
 		streamExecutionEnvironment
 			.addSource(new TestSourceFunction(new ExceptionThrowingAggregateFunction(), true))
-			.addSink(new NoOpSinkFunction());
+			.addSink(new DiscardingSink<>());
 
 		streamExecutionEnvironment.execute();
 	}
@@ -166,15 +166,4 @@ public class GlobalAggregateITCase extends AbstractTestBase {
 			return add(accumulatorA, accumulatorB);
 		}
 	}
-
-	/**
-	 * Sink function that does nothing.
-	 */
-	private static class NoOpSinkFunction implements SinkFunction<Integer> {
-
-		@Override
-		public void invoke(Integer value, Context context) throws Exception {
-
-		}
-	}
 }


[flink] 01/02: [FLINK-13376][datastream] ContinuousFileReaderOperator should respect semantics of BoundedOneInput

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fdb2a9256947feaf33460072785f26a104aa864
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Wed Jul 24 21:40:04 2019 +0800

    [FLINK-13376][datastream] ContinuousFileReaderOperator should respect semantics of BoundedOneInput
---
 .../source/ContinuousFileReaderOperator.java       |  16 +++-
 .../api/ContinuousFileReaderOperatorITCase.java    | 100 +++++++++++++++++++++
 2 files changed, 114 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 78e181a..15ee366 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
@@ -61,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, BoundedOneInput {
 
 	private static final long serialVersionUID = 1L;
 
@@ -196,6 +197,17 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	public void close() throws Exception {
 		super.close();
 
+		waitSplitReaderFinished();
+
+		output.close();
+	}
+
+	@Override
+	public void endInput() throws Exception {
+		waitSplitReaderFinished();
+	}
+
+	private void waitSplitReaderFinished() throws InterruptedException {
 		// make sure that we hold the checkpointing lock
 		Thread.holdsLock(checkpointLock);
 
@@ -215,8 +227,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 		if (readerContext != null) {
 			readerContext.emitWatermark(Watermark.MAX_WATERMARK);
 			readerContext.close();
+			readerContext = null;
 		}
-		output.close();
 	}
 
 	private class SplitReader<OT> extends Thread {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
new file mode 100644
index 0000000..b681114
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.PrintWriter;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Integration tests for {@link org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator}.
+ */
+public class ContinuousFileReaderOperatorITCase extends AbstractTestBase {
+
+	@Test
+	public void testEndInput() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		final File sourceFile = TEMPORARY_FOLDER.newFile();
+		final int elementCount = 10000;
+		try (PrintWriter printWriter = new PrintWriter(sourceFile)) {
+			for (int i = 0; i < elementCount; i++) {
+				printWriter.println(i);
+			}
+		}
+
+		DataStreamSource<String> source = env.readTextFile(sourceFile.getAbsolutePath());
+
+		// check the endInput is invoked at the right time
+		TestBoundedOneInputStreamOperator checkingOperator = new TestBoundedOneInputStreamOperator(elementCount);
+		DataStream<String> endInputChecking = source.transform("EndInputChecking", STRING_TYPE_INFO, checkingOperator);
+
+		endInputChecking.addSink(new NoOpSink());
+
+		env.execute("ContinuousFileReaderOperatorITCase.testEndInput");
+	}
+
+	private static class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<String>
+		implements OneInputStreamOperator<String, String>, BoundedOneInput {
+
+		private final int expectedProcessedElementCount;
+
+		private boolean hasEnded = false;
+
+		private int processedElementCount = 0;
+
+		TestBoundedOneInputStreamOperator(int expectedProcessedElementCount) {
+			// this operator must be chained with ContinuousFileReaderOperator
+			// that way, this end input would be triggered after ContinuousFileReaderOperator
+			chainingStrategy = ChainingStrategy.ALWAYS;
+			this.expectedProcessedElementCount = expectedProcessedElementCount;
+		}
+
+		@Override
+		public void endInput() throws Exception {
+			assertEquals(expectedProcessedElementCount, processedElementCount);
+			hasEnded = true;
+		}
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			assertFalse(hasEnded);
+			output.collect(element);
+			processedElementCount++;
+		}
+	}
+
+	private static class NoOpSink implements SinkFunction<String> {}
+}