You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/11 13:09:12 UTC

[3/3] flink git commit: [FLINK-5021] Remove the special EOS TimestampedFileInputSplit.

[FLINK-5021] Remove the special EOS TimestampedFileInputSplit.

Without this special split signaling that no more splits are
to arrive, the ContinuousFileReaderOperator now closes by
setting a flag that marks it as closed and exiting when the
flag is set to true and the pending split queue is empty.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98a61762
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98a61762
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98a61762

Branch: refs/heads/master
Commit: 98a6176280dd7b85dcd6fbacd324eb1056e23419
Parents: 9918839
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 3 10:50:04 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 11 14:05:58 2016 +0100

----------------------------------------------------------------------
 .../source/ContinuousFileReaderOperator.java    | 53 ++++++++++----------
 .../source/TimestampedFileInputSplit.java       | 25 ++++-----
 .../TimestampedFileInputSplitTest.java          | 25 ---------
 3 files changed, 37 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 19e4737..db8e8fd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -42,12 +42,10 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
 
-import static org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -162,22 +160,18 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	public void close() throws Exception {
 		super.close();
 
-		// signal that no more splits will come, wait for the reader to finish
-		// and close the collector. Further cleaning up is handled by the dispose().
+		// close the reader to signal that no more splits will come. By doing this,
+		// the reader will exit as soon as it finishes processing the already pending splits.
+		// This method will wait until then. Further cleaning up is handled by the dispose().
 
 		if (reader != null && reader.isAlive() && reader.isRunning()) {
-			// add a dummy element to signal that no more splits will
-			// arrive and wait until the reader finishes
-			reader.addSplit(EOS);
-
-			// we already have the checkpoint lock because close() is
-			// called by the StreamTask while having it.
+			reader.close();
 			checkpointLock.wait();
 		}
 
-		// finally if we are closed normally and we are operating on
-		// event or ingestion time, emit the max watermark indicating
-		// the end of the stream, like a normal source would do.
+		// finally if we are operating on event or ingestion time,
+		// emit the long-max watermark indicating the end of the stream,
+		// like a normal source would do.
 
 		if (readerContext != null) {
 			readerContext.emitWatermark(Watermark.MAX_WATERMARK);
@@ -188,6 +182,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 
 	private class SplitReader<OT> extends Thread {
 
+		private volatile boolean isClosed;
+
 		private volatile boolean isRunning;
 
 		private final FileInputFormat<OT> format;
@@ -213,14 +209,10 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 			this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
 			this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
 
+			this.isClosed = false;
 			this.isRunning = true;
 
-			this.pendingSplits = new PriorityQueue<>(10, new Comparator<TimestampedFileInputSplit>() {
-				@Override
-				public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) {
-					return o1.compareTo(o2);
-				}
-			});
+			this.pendingSplits = new PriorityQueue<>();
 
 			// this is the case where a task recovers from a previous failed attempt
 			if (restoredState != null) {
@@ -252,17 +244,21 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 
 						if (currentSplit == null) {
 							currentSplit = this.pendingSplits.poll();
+
+							// if the list of pending splits is empty (currentSplit == null) then:
+							//   1) if close() was called on the operator then exit the while loop
+							//   2) if not wait 50 ms and try again to fetch a new split to read
+
 							if (currentSplit == null) {
-								checkpointLock.wait(50);
+								if (!this.isClosed) {
+									checkpointLock.wait(50);
+								} else {
+									isRunning = false;
+								}
 								continue;
 							}
 						}
 
-						if (currentSplit.equals(EOS)) {
-							isRunning = false;
-							break;
-						}
-
 						if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
 							// recovering after a node failure with an input
 							// format that supports resetting the offset
@@ -332,7 +328,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 			List<TimestampedFileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
 			if (currentSplit != null ) {
 				if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
-					Serializable formatState = ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState();
+					Serializable formatState =
+						((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState();
 					this.currentSplit.setSplitState(formatState);
 				}
 				snapshot.add(this.currentSplit);
@@ -344,6 +341,10 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 		public void cancel() {
 			this.isRunning = false;
 		}
+
+		public void close() {
+			this.isClosed = true;
+		}
 	}
 
 	//	---------------------			Checkpointing			--------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
index 6a3ba0d..2a0be98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -44,10 +44,6 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara
 	 * */
 	private Serializable splitState;
 
-	/** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/
-	public static final TimestampedFileInputSplit EOS =
-		new TimestampedFileInputSplit(Long.MAX_VALUE, -1, null, -1, -1, null);
-
 	/**
 	 * Creates a {@link TimestampedFileInputSplit} based on the file modification time and
 	 * the rest of the information of the {@link FileInputSplit}, as returned by the
@@ -101,24 +97,23 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara
 
 	@Override
 	public int compareTo(TimestampedFileInputSplit o) {
-		long modTimeComp = this.modificationTime - o.modificationTime;
+		int modTimeComp = Long.compare(this.modificationTime, o.modificationTime);
 		if (modTimeComp != 0L) {
-			// we cannot just cast the modTimeComp to int
-			// because it may overflow
-			return modTimeComp > 0 ? 1 : -1;
+			return modTimeComp;
 		}
 
-		// the file input split allows for null paths
-		if (this.getPath() == o.getPath()) {
-			return 0;
-		} else if (this.getPath() == null) {
+		// the file input split does not prevent null paths.
+		if (this.getPath() == null && o.getPath() != null) {
 			return 1;
-		} else if (o.getPath() == null) {
+		} else if (this.getPath() != null && o.getPath() == null) {
 			return -1;
 		}
 
-		int pathComp = this.getPath().compareTo(o.getPath());
-		return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber();
+		int pathComp = this.getPath() == o.getPath() ? 0 :
+			this.getPath().compareTo(o.getPath());
+
+		return pathComp != 0 ? pathComp :
+			this.getSplitNumber() - o.getSplitNumber();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index 9dc90d3..0a89ab9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -23,7 +23,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -33,14 +32,8 @@ public class TimestampedFileInputSplitTest {
 	@Test
 	public void testSplitEquality() {
 
-		TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS;
-		TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS;
-
-		Assert.assertEquals(eos1, eos2);
-
 		TimestampedFileInputSplit richFirstSplit =
 			new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
-		Assert.assertNotEquals(eos1, richFirstSplit);
 
 		TimestampedFileInputSplit richSecondSplit =
 			new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
@@ -88,18 +81,6 @@ public class TimestampedFileInputSplitTest {
 
 		// smaller modification time first
 		Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
-
-		Assert.assertTrue(richFirstSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-		Assert.assertTrue(richSecondSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-		Assert.assertTrue(richThirdSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-		Assert.assertTrue(richForthSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-
-		Assert.assertEquals(0, TimestampedFileInputSplit.EOS.compareTo(TimestampedFileInputSplit.EOS));
-
-		Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richFirstSplit) > 0);
-		Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richSecondSplit) > 0);
-		Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richThirdSplit) > 0);
-		Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richForthSplit) > 0);
 	}
 
 	@Test
@@ -130,14 +111,10 @@ public class TimestampedFileInputSplitTest {
 		TimestampedFileInputSplit richFifthSplit =
 			new TimestampedFileInputSplit(11, 1, new Path("test/test3"), 0, 100, null);
 
-		TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS;
-
 		Queue<TimestampedFileInputSplit> pendingSplits = new PriorityQueue<>();
 
-		pendingSplits.add(eos);
 		pendingSplits.add(richSecondSplit);
 		pendingSplits.add(richForthSplit);
-		pendingSplits.add(eos);
 		pendingSplits.add(richFirstSplit);
 		pendingSplits.add(richFifthSplit);
 		pendingSplits.add(richFifthSplit);
@@ -158,8 +135,6 @@ public class TimestampedFileInputSplitTest {
 		expectedSortedSplits.add(richForthSplit);
 		expectedSortedSplits.add(richFifthSplit);
 		expectedSortedSplits.add(richFifthSplit);
-		expectedSortedSplits.add(eos);
-		expectedSortedSplits.add(eos);
 
 		Assert.assertArrayEquals(expectedSortedSplits.toArray(), actualSortedSplits.toArray());
 	}