You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/08/10 16:31:33 UTC

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2350

    [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink continuous_file_fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2350.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2350
    
----
commit 6207a9f5da086d808331afe0e8caf0f03b3fabc5
Author: kl0u <kk...@gmail.com>
Date:   2016-08-09T12:11:45Z

    [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling
    
    Now the ContinuousFileReaderOperator ignores the watermarks sent by
    the source function and emits its own watermarks in case we are
    opearating on Ingestion time. In addition, and for Ingestion time
    only, the reader also assigns the correct timestamps to the elements
    that it reads.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75298028
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---
    @@ -146,16 +146,24 @@ void checkAsyncException() {
     		private final Output<StreamRecord<T>> output;
     		private final StreamRecord<T> reuse;
     
    -		public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
    -			this.owner = owner;
    +		public NonTimestampContext(AbstractStreamOperator<T> owner, Object lockingObject, Output<StreamRecord<T>> output) {
     			this.lockingObject = lockingObject;
     			this.output = output;
     			this.reuse = new StreamRecord<T>(null);
    +
    +			// if it is a source, then we cast and cache it
    +			// here so that we do not have to do it in every collect(),
    +			// collectWithTimestamp() and emitWatermark()
    +
    +			this.owner = (owner instanceof StreamSource) ?
    +				(StreamSource) owner : null;
    --- End diff --
    
    This looks a bit hacky. How about you add an interface `AsyncException` or so, that all classes the `NonTimestampContext` are using can use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2350: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2350
  
    This looks very good now! \U0001f44d 
    
    I'm running it a last time on Travis and them I'm merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2350: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2350
  
    How does this fix work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75303417
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -106,6 +109,140 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +
    +		StreamConfig streamConfig = new StreamConfig(new Configuration());
    +		streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(100);
    +
    +		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
    +
    +		reader.setOutputType(typeInfo, new ExecutionConfig());
    --- End diff --
    
    You can reuse the EC created a few lines above :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75304305
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -106,6 +109,140 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +
    +		StreamConfig streamConfig = new StreamConfig(new Configuration());
    +		streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(100);
    +
    +		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
    +
    +		reader.setOutputType(typeInfo, new ExecutionConfig());
    +		tester.open();
    +
    +		timeServiceProvider.setCurrentTime(0);
    +
    +		long elementTimestamp = 201;
    +		timeServiceProvider.setCurrentTime(elementTimestamp);
    +
    +		// test that a watermark is actually emitted
    +		Assert.assertTrue(tester.getOutput().size() == 1 &&
    +			tester.getOutput().peek() instanceof Watermark &&
    +			((Watermark) tester.getOutput().peek()).getTimestamp() == 200);
    +
    +		// create the necessary splits for the test
    +		FileInputSplit[] splits = format.createInputSplits(
    +			reader.getRuntimeContext().getNumberOfParallelSubtasks());
    +
    +		// and feed them to the operator
    +		for(FileInputSplit split: splits) {
    +			tester.processElement(new StreamRecord<>(split));
    +		}
    +
    +		/*
    +		* Given that the reader is multithreaded, the test finishes before the reader thread finishes
    +		* reading. This results in files being deleted by the test before being read, thus throwing an exception.
    +		* In addition, even if file deletion happens at the end, the results are not ready for testing.
    +		* To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
    +		*/
    +
    +		long start = System.currentTimeMillis();
    +		Queue<Object> output;
    +		do {
    +			output = tester.getOutput();
    +			Thread.sleep(50);
    +		} while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
    --- End diff --
    
    I wonder if this can lead to unstable tests (for example on Travis).
    What if the output needs more than one second to show up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2350: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2350
  
    I made one inline comments about moving the `SourceContext` and the instantiation code.
    
    Also, the problem with the "async exception check" can be solved by introducing an interface `AsyncExceptionChecker` that is passed to the context. (I think that's what @rmetzger was hinting at.)
    
    Even better yet, we might be able to get rid of that stuff by using `task.failExternally()` in all places that previously made these async checks necessary. (The file read operator already uses that, btw) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/2350


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75303846
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -106,6 +109,140 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +
    +		StreamConfig streamConfig = new StreamConfig(new Configuration());
    +		streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(100);
    +
    +		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
    +
    +		reader.setOutputType(typeInfo, new ExecutionConfig());
    +		tester.open();
    +
    +		timeServiceProvider.setCurrentTime(0);
    +
    +		long elementTimestamp = 201;
    +		timeServiceProvider.setCurrentTime(elementTimestamp);
    +
    +		// test that a watermark is actually emitted
    +		Assert.assertTrue(tester.getOutput().size() == 1 &&
    +			tester.getOutput().peek() instanceof Watermark &&
    +			((Watermark) tester.getOutput().peek()).getTimestamp() == 200);
    --- End diff --
    
    You don't need to change it, but I think it's a good idea to test the conditions independently. This allows you to see which condition was false, based on the line number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2350: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2350
  
    The way it works is that now the reader gets a ReaderContext and emits its own watermarks depending on which timeCharacteristic we are operating on. If it is on IngestionTime, which was the original problem, we emit periodically. In addition, in this case, it assigns timestamps to the emitted elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75488070
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -103,12 +103,28 @@ public void open() throws Exception {
     		this.format.setRuntimeContext(getRuntimeContext());
     		this.format.configure(new Configuration());
     
    -		this.collector = new TimestampedCollector<>(output);
     		this.checkpointLock = getContainingTask().getCheckpointLock();
     
     		Preconditions.checkState(reader == null, "The reader is already initialized.");
     
    -		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
    +		// set the reader context based on the time characteristic
    --- End diff --
    
    I think both the `SourceContext` plus subclasses and this instantiation code should be moved out of the sources since it is now used for more than that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75298087
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---
    @@ -146,16 +146,24 @@ void checkAsyncException() {
     		private final Output<StreamRecord<T>> output;
     		private final StreamRecord<T> reuse;
     
    -		public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
    -			this.owner = owner;
    +		public NonTimestampContext(AbstractStreamOperator<T> owner, Object lockingObject, Output<StreamRecord<T>> output) {
     			this.lockingObject = lockingObject;
     			this.output = output;
     			this.reuse = new StreamRecord<T>(null);
    +
    +			// if it is a source, then we cast and cache it
    +			// here so that we do not have to do it in every collect(),
    +			// collectWithTimestamp() and emitWatermark()
    +
    +			this.owner = (owner instanceof StreamSource) ?
    +				(StreamSource) owner : null;
    --- End diff --
    
    For the file reader, the method can just be empty



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75652728
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---
    @@ -188,18 +189,19 @@ public void close() {}
     	 */
     	public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
     
    -		private final StreamSource<?, ?> owner;
    +		private final AbstractStreamOperator<T> owner;
    --- End diff --
    
    This should also be an AsyncExceptionChecker, same for the parameter. For the time handling, this can get a `TimeServiceProvider`, that way, things are cleanly separated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2350: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2350
  
    Ah, seems I was a  bit to quick earlier. I added one more inline comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75297857
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -179,7 +195,16 @@ public void close() throws Exception {
     			// called by the StreamTask while having it.
     			checkpointLock.wait();
     		}
    -		collector.close();
    +
    +		// finally if we are closed normally and we are operating on
    +		// event or ingestion time, emit the max watermark indicating
    +		// the end of the stream, like a normal source would do.
    +
    +		readerContext.emitWatermark(Watermark.MAX_WATERMARK);
    +		if (readerContext != null) {
    --- End diff --
    
    if `readerContext` is null, we'll get a NPE in the line before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75651032
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---
    @@ -214,14 +216,26 @@ public AutomaticWatermarkContext(
     			this.watermarkInterval = watermarkInterval;
     			this.reuse = new StreamRecord<T>(null);
     
    +			// if it is a source, then we cast and cache it
    +			// here so that we do not have to do it in every collect(),
    +			// collectWithTimestamp() and emitWatermark()
    +
    +			if (!(owner instanceof AsyncExceptionChecker)) {
    +				throw new IllegalStateException("The ManualWatermarkContext can only be used " +
    +					"with sources that implement the AsyncExceptionThrower interface.");
    +			}
    +			this.source = (AsyncExceptionChecker) owner;
    +
     			long now = owner.getCurrentProcessingTime();
     			this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
     				new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
     		}
     
     		@Override
     		public void collect(T element) {
    -			owner.checkAsyncException();
    +			if (source != null) {
    --- End diff --
    
    I don't think these null checks are needed, because the `IllegalStateException` is thrown if `owner` is `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75304033
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -106,6 +109,140 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +
    +		StreamConfig streamConfig = new StreamConfig(new Configuration());
    +		streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(100);
    +
    +		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig);
    +
    +		reader.setOutputType(typeInfo, new ExecutionConfig());
    +		tester.open();
    +
    +		timeServiceProvider.setCurrentTime(0);
    +
    +		long elementTimestamp = 201;
    +		timeServiceProvider.setCurrentTime(elementTimestamp);
    --- End diff --
    
    Can you quickly explain how this works?
    Is the `OneInputStreamOperatorTestHarness` starting a thread in the background emitting watermarks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2350: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75647030
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -63,22 +66,22 @@
      */
     @Internal
     public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
    -	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
    +	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, AsyncExceptionChecker {
     
     	private static final long serialVersionUID = 1L;
     
     	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
     
    +	@VisibleForTesting
    --- End diff --
    
    This doesn't do anything. It's just a marker interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---