You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/09/25 15:30:11 UTC

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2546

    [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink fix_ingestion_time

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2546
    
----
commit 1b15b77b80334adf869714937dbfa8d8b7c2e12f
Author: kl0u <kk...@gmail.com>
Date:   2016-08-25T15:38:49Z

    [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80904818
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -224,7 +327,7 @@ public void testFilePathFiltering() throws Exception {
     		monitoringFunction.open(new Configuration());
     		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
     
    -		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
    +		Assert.assertEquals(uniqFilesFound.size(), NO_OF_FILES);
    --- End diff --
    
    `assertEquals()` takes "expected" first and "actual" second.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80901983
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---
    @@ -99,7 +100,7 @@ public void run() {
     					target.trigger(timestamp);
     				} catch (Throwable t) {
     					TimerException asyncException = new TimerException(t);
    --- End diff --
    
    Do we need this extra level of exception wrapping?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    Hi @aljoscha, the problem with the AlignedWindowOperator  tests is that they were using the DefaultTimeServiceProvider and by not shutting down the service, the previous timers would fire and throw a NPE because the reference to the operator they had would have been invalidated.
    
    For the restriction to TestTimeServiceProvider, this was done because now the DefaultTimeServiceProvider needs the checkpointLock, so either in the same constructor we add this as an argument, or we have to restrict the options to only the TestProvider.
    
    Finally for the StreamConfig I agree that it is only needed for one test so we can just expose it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    All in all some minor change requests, otherwise this seems good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r81088167
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---
    @@ -99,7 +100,7 @@ public void run() {
     					target.trigger(timestamp);
     				} catch (Throwable t) {
     					TimerException asyncException = new TimerException(t);
    --- End diff --
    
    No. This is just because this is how it was before. I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    Just a quick comment (I didn't review all code): Why does this touch the AlignedWindowOperator tests? I would like to keep this commit as small as possible because we're dealing with sensitive stuff where I'd like to clearly separate things.
    
    In `OneInputStreamOperatorTestHarness` and `KeyedOneInputStreamOperatorTestHarness`, restricting the time provider parameter to a `TestTimeServiceProvider` does not change anything, right? So I think we can leave it as is. Also in `OneInputStreamOperatorTestHarness` the additional `TimeCharacteristic` parameter is only useful for one specific test so I think it would be better to instead expose the `StreamConfig` and set the parameter there for the one test to keep the number of constructors manageable. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80901399
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---
    @@ -109,9 +110,15 @@ public void run() {
     	public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
     		return new DefaultTimeServiceProvider(new AsyncExceptionHandler() {
     			@Override
    -			public void registerAsyncException(AsynchronousException exception) {
    +			public void handleAsyncException(String message, Throwable exception) {
     				exception.printStackTrace();
     			}
     		}, executor, checkpointLock);
     	}
    +
    +	@VisibleForTesting
    +	public static DefaultTimeServiceProvider createForTestingWithHandler(
    --- End diff --
    
    Is this the exact same code as the default constructor? Can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r81089031
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---
    @@ -99,7 +100,7 @@ public void run() {
     					target.trigger(timestamp);
     				} catch (Throwable t) {
     					TimerException asyncException = new TimerException(t);
    --- End diff --
    
    Although, now that I think about it, it is good to know that it came from a timer callback. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    I added some more comments. I could not find in that test anywhere the notion of checking that elements are not late, but properly interleaved with the watermarks.
    
    Is there a test that checks that the reader does not let LongMax watermarks pass through? Or that the split generating task does not emit a long-max watermark on exit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2546


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80902027
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java ---
    @@ -18,12 +18,14 @@
     package org.apache.flink.streaming.runtime.tasks;
     
     /**
    - * Interface for reporting exceptions that are thrown in (possibly) a different thread.
    + * An interface marking a task as capable of handling exceptions thrown
    + * by different threads, other than the one executing the task itself.
      */
     public interface AsyncExceptionHandler {
     
     	/**
    -	 * Registers the given exception.
    +	 * Handles an exception thrown by another thread (e.g. a TriggerTask),
    +	 * other than the one executing the main task.
     	 */
    -	void registerAsyncException(AsynchronousException exception);
    +	void handleAsyncException(String message, Throwable exception);
    --- End diff --
    
    This name change is good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r81204726
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {
     
     	private int getLineNo(String line) {
     		String[] tkns = line.split("\\s");
    -		Assert.assertEquals(tkns.length, 6);
    +		Assert.assertEquals(6, tkns.length);
     		return Integer.parseInt(tkns[tkns.length - 1]);
     	}
     
    +	private class TimeUpdatingThread extends Thread {
    +
    +		private volatile boolean isRunning;
    +
    +		private final TestTimeServiceProvider timeServiceProvider;
    +		private final OneInputStreamOperatorTestHarness testHarness;
    +		private final long wmInterval;
    +		private final int elementUntilUpdating;
    +
    +		TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider,
    +						   final OneInputStreamOperatorTestHarness testHarness,
    +						   final long wmInterval,
    +						   final int elementUntilUpdating) {
    +
    +			this.timeServiceProvider = timeServiceProvider;
    +			this.testHarness = testHarness;
    +			this.wmInterval = wmInterval;
    +			this.elementUntilUpdating = elementUntilUpdating;
    +			this.isRunning = true;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (isRunning) {
    +					if (testHarness.getOutput().size() % elementUntilUpdating == 0) {
    --- End diff --
    
    There is a "race" between the operator emitting elements and this thread. Both run in loops without delays. Only if this condition is evaluated by chance at the exact point in time when the list happens to have so many result elements, there will actually be a time advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80916240
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -106,6 +107,117 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(100);
    +
    +		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig,
    +				timeServiceProvider, TimeCharacteristic.IngestionTime);
    +
    +		reader.setOutputType(typeInfo, executionConfig);
    +		tester.open();
    +
    +		// test that watermarks are correctly emitted
    +
    +		timeServiceProvider.setCurrentTime(201);
    +		timeServiceProvider.setCurrentTime(301);
    +		timeServiceProvider.setCurrentTime(401);
    +		timeServiceProvider.setCurrentTime(501);
    +
    +		int i = 0;
    +		for(Object line: tester.getOutput()) {
    +			if (!(line instanceof Watermark)) {
    +				Assert.fail("Only watermarks are expected here ");
    +			}
    +			Watermark w = (Watermark) line;
    +			Assert.assertEquals(w.getTimestamp(), 200 + (i * 100));
    +			i++;
    +		}
    +
    +		// clear the output to get the elements only and the final watermark
    +		tester.getOutput().clear();
    +		Assert.assertEquals(tester.getOutput().size(), 0);
    +
    +		// create the necessary splits for the test
    +		FileInputSplit[] splits = format.createInputSplits(
    +			reader.getRuntimeContext().getNumberOfParallelSubtasks());
    +
    +		// and feed them to the operator
    +		for(FileInputSplit split: splits) {
    +			tester.processElement(new StreamRecord<>(split));
    +		}
    +
    +		// then close the reader gracefully so that
    +		// we wait until all input is read
    +		synchronized (tester.getCheckpointLock()) {
    +			tester.close();
    +		}
    +
    +		for(org.apache.hadoop.fs.Path file: filesCreated) {
    +			hdfs.delete(file, false);
    +		}
    +
    +		// the lines received must be the elements in the files +1 for the Long.MAX_VALUE watermark
    +		Assert.assertEquals(tester.getOutput().size(), NO_OF_FILES * LINES_PER_FILE + 1);
    +
    +		// put the elements read in a map by file they belong to
    +		Map<Integer, List<String>> actualFileContents = new HashMap<>();
    +		for(Object line: tester.getOutput()) {
    +			if (line instanceof StreamRecord) {
    +				StreamRecord<String> element = (StreamRecord<String>) line;
    +				Assert.assertEquals(element.getTimestamp(), 501);
    +
    +				int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
    +				List<String> content = actualFileContents.get(fileIdx);
    +				if (content == null) {
    +					content = new ArrayList<>();
    +					actualFileContents.put(fileIdx, content);
    +				}
    +				content.add(element.getValue() + "\n");
    +			} else if (line instanceof Watermark) {
    +				Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE);
    --- End diff --
    
    Does the test assume that all watermarks emitted by the reader are LongMax? I am confused here, isn't that exactly what should NOT happen? Otherwise all emitted elements are late?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    Thanks for the comments @StephanEwen and @aljoscha ! 
    I integrated most of them. 
    Please have a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80901203
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---
    @@ -209,6 +209,11 @@ public void testWindowTriggerTimeAlignment() throws Exception {
     			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
     			op.dispose();
     
    +			timerService.shutdownService();
    --- End diff --
    
    Why does this need to create and shut down a timer service every time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80901242
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---
    @@ -201,6 +201,11 @@ public void testWindowTriggerTimeAlignment() throws Exception {
     			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
     			op.dispose();
     
    +			timerService.shutdownService();
    --- End diff --
    
    Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r81204828
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {
     
     	private int getLineNo(String line) {
     		String[] tkns = line.split("\\s");
    -		Assert.assertEquals(tkns.length, 6);
    +		Assert.assertEquals(6, tkns.length);
     		return Integer.parseInt(tkns[tkns.length - 1]);
     	}
     
    +	private class TimeUpdatingThread extends Thread {
    +
    +		private volatile boolean isRunning;
    +
    +		private final TestTimeServiceProvider timeServiceProvider;
    +		private final OneInputStreamOperatorTestHarness testHarness;
    +		private final long wmInterval;
    +		private final int elementUntilUpdating;
    +
    +		TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider,
    +						   final OneInputStreamOperatorTestHarness testHarness,
    +						   final long wmInterval,
    +						   final int elementUntilUpdating) {
    +
    +			this.timeServiceProvider = timeServiceProvider;
    +			this.testHarness = testHarness;
    +			this.wmInterval = wmInterval;
    +			this.elementUntilUpdating = elementUntilUpdating;
    +			this.isRunning = true;
    +		}
    +
    +		@Override
    +		public void run() {
    +			try {
    +				while (isRunning) {
    +					if (testHarness.getOutput().size() % elementUntilUpdating == 0) {
    +						long now = timeServiceProvider.getCurrentProcessingTime();
    +						timeServiceProvider.setCurrentTime(now + wmInterval);
    +					}
    +				}
    +			} catch (Exception e) {
    +				e.printStackTrace();
    --- End diff --
    
    This will not result in any meaningful feedback to the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r81204990
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -190,12 +213,30 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception {
     				}
     				content.add(element.getValue() + "\n");
     			} else if (line instanceof Watermark) {
    -				Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE);
    +				watermarkTimestamps.add(((Watermark) line).getTimestamp());
     			} else {
     				Assert.fail("Unknown element in the list.");
     			}
     		}
     
    +		// check if all the input was read
    +		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines);
    +
    +		// check if the last element is the LongMax watermark
    +		Assert.assertTrue(lastElement instanceof Watermark);
    +		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
    +
    +		System.out.println(watermarkTimestamps.size());
    --- End diff --
    
    Leftover sysout printing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    Hi @StephanEwen . If I understand correctly, your suggestion is to make the test something like the following: 1) put the split in the reader 2) read the split 3) when the split finishes update the time in the provider 4) observe the time in the output elements. If this is the case, then the problem is that the reader just puts the split in a queue, and this is picked up by another thread that reads it. In this context, there is no way of knowing when the reading thread has finished reading the split and goes to the next one. So step 3) cannot be synchronized correctly. This is the reason I am just having a thread in the test that tries (without guarantees - the race condition you mentioned) to update the time while the reader is still reading. Any suggestions are welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2546: [FLINK-4329] Fix Streaming File Source Timestamps/...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80915830
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---
    @@ -106,6 +107,117 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(100);
    +
    +		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig,
    +				timeServiceProvider, TimeCharacteristic.IngestionTime);
    +
    +		reader.setOutputType(typeInfo, executionConfig);
    +		tester.open();
    +
    +		// test that watermarks are correctly emitted
    +
    +		timeServiceProvider.setCurrentTime(201);
    +		timeServiceProvider.setCurrentTime(301);
    +		timeServiceProvider.setCurrentTime(401);
    +		timeServiceProvider.setCurrentTime(501);
    +
    +		int i = 0;
    +		for(Object line: tester.getOutput()) {
    +			if (!(line instanceof Watermark)) {
    +				Assert.fail("Only watermarks are expected here ");
    +			}
    +			Watermark w = (Watermark) line;
    +			Assert.assertEquals(w.getTimestamp(), 200 + (i * 100));
    +			i++;
    +		}
    +
    +		// clear the output to get the elements only and the final watermark
    +		tester.getOutput().clear();
    +		Assert.assertEquals(tester.getOutput().size(), 0);
    +
    +		// create the necessary splits for the test
    +		FileInputSplit[] splits = format.createInputSplits(
    --- End diff --
    
    What will the `getNumberOfParallelSubtasks()` be here? The test does not control the number of splits, but leave this to the implicit behavior of the test harness?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    Hi @StephanEwen , thanks for the review!
    
    The watermarks/timestamps are now generated by the Reader, and not the operator that creates the splits. The same holds for the LongMax watermark, which is created at the close() of the ContinuousFileReaderOperator. 
    
    As for tests, it is the testFileReadingOperatorWithIngestionTime() in the ContinuousFileMonitoringTest which checks if the last Watermark is the LongMax.
    
    The original problem was that there were no timestamps assigned to the elements for Ingestion time and watermarks were emitted (I think it was a Process_once case).
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2546: [FLINK-4329] Fix Streaming File Source Timestamps/Waterma...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2546
  
    Actually, let me take a step back and understand a few things deeper, first.
    Who actually generates the watermarks (in ingestion time)? The operator that creates the file splits, or the operator that reads the splits?
    
    If the configuration is set to IngestionTime, will the operator that creates the file splits emit a final LongMax watermark? Is that one passing through by the split-reading operator? Is there a test that test that specific scenario? (I believe it was the initially reported bug).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---