You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/17 11:33:26 UTC
flink git commit: [FLINK-4075]
ContinuousFileProcessingCheckpointITCase failed on Travis
Repository: flink
Updated Branches:
refs/heads/master ec6d97528 -> fc4abd7ff
[FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4abd7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4abd7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4abd7f
Branch: refs/heads/master
Commit: fc4abd7fff5fa9bbfbd2196e61bf696a1dd57ad7
Parents: ec6d975
Author: kl0u <kk...@gmail.com>
Authored: Thu Jun 16 18:18:28 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jun 17 13:32:02 2016 +0200
----------------------------------------------------------------------
.../flink/api/java/io/AvroInputFormat.java | 23 +++++++++++---------
.../flink/api/common/io/BinaryInputFormat.java | 17 +++++++++------
.../api/common/io/DelimitedInputFormat.java | 12 +++++++---
.../source/ContinuousFileReaderOperator.java | 16 +++++++++++---
4 files changed, 45 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index a920275..73067c1 100644
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -68,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
private transient long recordsReadSinceLastSync;
- private transient long lastSync = -1l;
+ private long lastSync = -1l;
public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
@@ -186,18 +186,21 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
- this.open(split);
- if (state.f0 != -1) {
+ try {
+ this.open(split);
+ } finally {
+ if (state.f0 != -1) {
+ lastSync = state.f0;
+ recordsReadSinceLastSync = state.f1;
+ }
+ }
- // go to the block we stopped
- lastSync = state.f0;
+ if (lastSync != -1) {
+ // open and read until the record we were before
+ // the checkpoint and discard the values
dataFileReader.seek(lastSync);
-
- // read until the record we were before the checkpoint and discard the values
- long recordsToDiscard = state.f1;
- for(int i = 0; i < recordsToDiscard; i++) {
+ for(int i = 0; i < recordsReadSinceLastSync; i++) {
dataFileReader.next(null);
- recordsReadSinceLastSync++;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index eb83bda..96e0e0d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -390,14 +390,17 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
- this.open(split);
- this.blockInfo = this.createAndReadBlockInfo();
+ try {
+ this.open(split);
+ } finally {
+ this.blockInfo = this.createAndReadBlockInfo();
- long blockPos = state.f0;
- this.readRecords = state.f1;
+ long blockPos = state.f0;
+ this.readRecords = state.f1;
- this.stream.seek(this.splitStart + blockPos);
- this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
- this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
+ this.stream.seek(this.splitStart + blockPos);
+ this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
+ this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 3a77200..4cd200d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -144,7 +144,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
private transient boolean end;
- private transient long offset = -1;
+ private long offset = -1;
// --------------------------------------------------------------------------------------------
// The configuration parameters. Configured on the instance and serialized to be shipped.
@@ -638,9 +638,15 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
public void reopen(FileInputSplit split, Long state) throws IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+ Preconditions.checkArgument(state == -1 || state >= split.getStart(),
+ " Illegal offset "+ state +", smaller than the splits start=" + split.getStart());
+
+ try {
+ this.open(split);
+ } finally {
+ this.offset = state;
+ }
- this.open(split);
- this.offset = state;
if (state > this.splitStart + split.getLength()) {
this.end = true;
} else if (state > split.getStart()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index e26c534..9319338 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -188,6 +188,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private S restoredFormatState = null;
+ private volatile boolean isSplitOpen = false;
+
SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
TimestampedCollector<OT> collector,
@@ -271,6 +273,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
this.format.open(currentSplit);
}
+ this.isSplitOpen = true;
}
LOG.info("Reading split: " + currentSplit);
@@ -290,8 +293,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
} finally {
// close and prepare for the next iteration
- this.format.close();
- this.currentSplit = null;
+ synchronized (checkpointLock) {
+ this.format.close();
+ this.isSplitOpen = false;
+ this.currentSplit = null;
+ }
}
}
@@ -303,8 +309,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
} finally {
synchronized (checkpointLock) {
LOG.info("Reader terminated, and exiting...");
+
this.format.closeInputFormat();
+ this.isSplitOpen = false;
+ this.currentSplit = null;
this.isRunning = false;
+
checkpointLock.notifyAll();
}
}
@@ -321,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.pendingSplits.remove();
}
- if (this.format instanceof CheckpointableInputFormat) {
+ if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
} else {