You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/23 11:24:17 UTC

[GitHub] [flink] AHeise commented on a change in pull request #10435: [FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox execution model

AHeise commented on a change in pull request #10435: [FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r369995184
 
 

 ##########
 File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 ##########
 @@ -299,34 +198,76 @@ public void open(FileInputSplit fileSplit) throws IOException {
 		}
 
 		@Override
-		public void reopen(FileInputSplit split, Integer state) throws IOException {
+		public void reopen(FileInputSplit split, Integer state) {
 			this.split = split;
 			this.state = state;
 		}
 
 		@Override
-		public Integer getCurrentState() throws IOException {
+		public Integer getCurrentState() {
 			return state;
 		}
 
 		@Override
-		public boolean reachedEnd() throws IOException {
+		public boolean reachedEnd() {
 			if (state == elementsBeforeCheckpoint) {
-				triggerLatch.trigger();
-				if (!waitingLatch.isTriggered()) {
-					try {
-						waitingLatch.await();
-					} catch (InterruptedException e) {
-						e.printStackTrace();
-					}
+				firstChunkTrigger.trigger();
+				try {
+					continueLatch.await();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+					Thread.currentThread().interrupt();
 				}
 			}
-			return state == linesPerSplit;
+			boolean ended = state == linesPerSplit;
+			if (ended) {
+				endTrigger.trigger();
+			}
+			return ended;
 		}
 
 		@Override
-		public String nextRecord(String reuse) throws IOException {
+		public String nextRecord(String reuse) {
 			return reachedEnd() ? null : split.getSplitNumber() + ": test line " + state++;
 		}
+
+		public void awaitFirstChunkProcessed() throws InterruptedException {
+			firstChunkTrigger.await();
+		}
+
+		public void awaitLastProcessed() throws InterruptedException {
+			endTrigger.await();
+		}
+
+		public void resume() {
+			continueLatch.trigger();
+		}
+	}
+
+	private static final class HarnessWithFormat extends Tuple2<OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String>, BlockingFileInputFormat> {
 
 Review comment:
   Is there a reason to extend Tuple2 instead of using an immutable class with two named fields?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services