You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/11 13:09:12 UTC
[3/3] flink git commit: [FLINK-5021] Remove the special EOS
TimestampedFileInputSplit.
[FLINK-5021] Remove the special EOS TimestampedFileInputSplit.
Without this special split signaling that no more splits are
to arrive, the ContinuousFileReaderOperator now closes by
setting a flag that marks it as closed and exiting when the
flag is set to true and the pending split queue is empty.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98a61762
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98a61762
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98a61762
Branch: refs/heads/master
Commit: 98a6176280dd7b85dcd6fbacd324eb1056e23419
Parents: 9918839
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 3 10:50:04 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 11 14:05:58 2016 +0100
----------------------------------------------------------------------
.../source/ContinuousFileReaderOperator.java | 53 ++++++++++----------
.../source/TimestampedFileInputSplit.java | 25 ++++-----
.../TimestampedFileInputSplitTest.java | 25 ---------
3 files changed, 37 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 19e4737..db8e8fd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -42,12 +42,10 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
-import static org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -162,22 +160,18 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
public void close() throws Exception {
super.close();
- // signal that no more splits will come, wait for the reader to finish
- // and close the collector. Further cleaning up is handled by the dispose().
+ // close the reader to signal that no more splits will come. By doing this,
+ // the reader will exit as soon as it finishes processing the already pending splits.
+ // This method will wait until then. Further cleaning up is handled by the dispose().
if (reader != null && reader.isAlive() && reader.isRunning()) {
- // add a dummy element to signal that no more splits will
- // arrive and wait until the reader finishes
- reader.addSplit(EOS);
-
- // we already have the checkpoint lock because close() is
- // called by the StreamTask while having it.
+ reader.close();
checkpointLock.wait();
}
- // finally if we are closed normally and we are operating on
- // event or ingestion time, emit the max watermark indicating
- // the end of the stream, like a normal source would do.
+ // finally if we are operating on event or ingestion time,
+ // emit the long-max watermark indicating the end of the stream,
+ // like a normal source would do.
if (readerContext != null) {
readerContext.emitWatermark(Watermark.MAX_WATERMARK);
@@ -188,6 +182,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
private class SplitReader<OT> extends Thread {
+ private volatile boolean isClosed;
+
private volatile boolean isRunning;
private final FileInputFormat<OT> format;
@@ -213,14 +209,10 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
+ this.isClosed = false;
this.isRunning = true;
- this.pendingSplits = new PriorityQueue<>(10, new Comparator<TimestampedFileInputSplit>() {
- @Override
- public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) {
- return o1.compareTo(o2);
- }
- });
+ this.pendingSplits = new PriorityQueue<>();
// this is the case where a task recovers from a previous failed attempt
if (restoredState != null) {
@@ -252,17 +244,21 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
if (currentSplit == null) {
currentSplit = this.pendingSplits.poll();
+
+ // if the list of pending splits is empty (currentSplit == null) then:
+ // 1) if close() was called on the operator then exit the while loop
+ // 2) if not wait 50 ms and try again to fetch a new split to read
+
if (currentSplit == null) {
- checkpointLock.wait(50);
+ if (!this.isClosed) {
+ checkpointLock.wait(50);
+ } else {
+ isRunning = false;
+ }
continue;
}
}
- if (currentSplit.equals(EOS)) {
- isRunning = false;
- break;
- }
-
if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
// recovering after a node failure with an input
// format that supports resetting the offset
@@ -332,7 +328,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
List<TimestampedFileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
if (currentSplit != null ) {
if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
- Serializable formatState = ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState();
+ Serializable formatState =
+ ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState();
this.currentSplit.setSplitState(formatState);
}
snapshot.add(this.currentSplit);
@@ -344,6 +341,10 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
public void cancel() {
this.isRunning = false;
}
+
+ public void close() {
+ this.isClosed = true;
+ }
}
// --------------------- Checkpointing --------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
index 6a3ba0d..2a0be98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -44,10 +44,6 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara
* */
private Serializable splitState;
- /** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/
- public static final TimestampedFileInputSplit EOS =
- new TimestampedFileInputSplit(Long.MAX_VALUE, -1, null, -1, -1, null);
-
/**
* Creates a {@link TimestampedFileInputSplit} based on the file modification time and
* the rest of the information of the {@link FileInputSplit}, as returned by the
@@ -101,24 +97,23 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara
@Override
public int compareTo(TimestampedFileInputSplit o) {
- long modTimeComp = this.modificationTime - o.modificationTime;
+ int modTimeComp = Long.compare(this.modificationTime, o.modificationTime);
if (modTimeComp != 0L) {
- // we cannot just cast the modTimeComp to int
- // because it may overflow
- return modTimeComp > 0 ? 1 : -1;
+ return modTimeComp;
}
- // the file input split allows for null paths
- if (this.getPath() == o.getPath()) {
- return 0;
- } else if (this.getPath() == null) {
+ // the file input split does not prevent null paths.
+ if (this.getPath() == null && o.getPath() != null) {
return 1;
- } else if (o.getPath() == null) {
+ } else if (this.getPath() != null && o.getPath() == null) {
return -1;
}
- int pathComp = this.getPath().compareTo(o.getPath());
- return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber();
+ int pathComp = this.getPath() == o.getPath() ? 0 :
+ this.getPath().compareTo(o.getPath());
+
+ return pathComp != 0 ? pathComp :
+ this.getSplitNumber() - o.getSplitNumber();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index 9dc90d3..0a89ab9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -23,7 +23,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
@@ -33,14 +32,8 @@ public class TimestampedFileInputSplitTest {
@Test
public void testSplitEquality() {
- TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS;
- TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS;
-
- Assert.assertEquals(eos1, eos2);
-
TimestampedFileInputSplit richFirstSplit =
new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
- Assert.assertNotEquals(eos1, richFirstSplit);
TimestampedFileInputSplit richSecondSplit =
new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
@@ -88,18 +81,6 @@ public class TimestampedFileInputSplitTest {
// smaller modification time first
Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
-
- Assert.assertTrue(richFirstSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
- Assert.assertTrue(richSecondSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
- Assert.assertTrue(richThirdSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
- Assert.assertTrue(richForthSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-
- Assert.assertEquals(0, TimestampedFileInputSplit.EOS.compareTo(TimestampedFileInputSplit.EOS));
-
- Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richFirstSplit) > 0);
- Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richSecondSplit) > 0);
- Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richThirdSplit) > 0);
- Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richForthSplit) > 0);
}
@Test
@@ -130,14 +111,10 @@ public class TimestampedFileInputSplitTest {
TimestampedFileInputSplit richFifthSplit =
new TimestampedFileInputSplit(11, 1, new Path("test/test3"), 0, 100, null);
- TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS;
-
Queue<TimestampedFileInputSplit> pendingSplits = new PriorityQueue<>();
- pendingSplits.add(eos);
pendingSplits.add(richSecondSplit);
pendingSplits.add(richForthSplit);
- pendingSplits.add(eos);
pendingSplits.add(richFirstSplit);
pendingSplits.add(richFifthSplit);
pendingSplits.add(richFifthSplit);
@@ -158,8 +135,6 @@ public class TimestampedFileInputSplitTest {
expectedSortedSplits.add(richForthSplit);
expectedSortedSplits.add(richFifthSplit);
expectedSortedSplits.add(richFifthSplit);
- expectedSortedSplits.add(eos);
- expectedSortedSplits.add(eos);
Assert.assertArrayEquals(expectedSortedSplits.toArray(), actualSortedSplits.toArray());
}