You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/17 11:33:26 UTC

flink git commit: [FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis

Repository: flink
Updated Branches:
  refs/heads/master ec6d97528 -> fc4abd7ff


[FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4abd7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4abd7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4abd7f

Branch: refs/heads/master
Commit: fc4abd7fff5fa9bbfbd2196e61bf696a1dd57ad7
Parents: ec6d975
Author: kl0u <kk...@gmail.com>
Authored: Thu Jun 16 18:18:28 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jun 17 13:32:02 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroInputFormat.java      | 23 +++++++++++---------
 .../flink/api/common/io/BinaryInputFormat.java  | 17 +++++++++------
 .../api/common/io/DelimitedInputFormat.java     | 12 +++++++---
 .../source/ContinuousFileReaderOperator.java    | 16 +++++++++++---
 4 files changed, 45 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index a920275..73067c1 100644
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -68,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 
 	private transient long recordsReadSinceLastSync;
 
-	private transient long lastSync = -1l;
+	private long lastSync = -1l;
 
 	public AvroInputFormat(Path filePath, Class<E> type) {
 		super(filePath);
@@ -186,18 +186,21 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
 		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
 
-		this.open(split);
-		if (state.f0 != -1) {
+		try {
+			this.open(split);
+		} finally {
+			if (state.f0 != -1) {
+				lastSync = state.f0;
+				recordsReadSinceLastSync = state.f1;
+			}
+		}
 
-			// go to the block we stopped
-			lastSync = state.f0;
+		if (lastSync != -1) {
+			// open and read until the record we were before
+			// the checkpoint and discard the values
 			dataFileReader.seek(lastSync);
-
-			// read until the record we were before the checkpoint and discard the values
-			long recordsToDiscard = state.f1;
-			for(int i = 0; i < recordsToDiscard; i++) {
+			for(int i = 0; i < recordsReadSinceLastSync; i++) {
 				dataFileReader.next(null);
-				recordsReadSinceLastSync++;
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index eb83bda..96e0e0d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -390,14 +390,17 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
 		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
 		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
 
-		this.open(split);
-		this.blockInfo = this.createAndReadBlockInfo();
+		try {
+			this.open(split);
+		} finally {
+			this.blockInfo = this.createAndReadBlockInfo();
 
-		long blockPos = state.f0;
-		this.readRecords = state.f1;
+			long blockPos = state.f0;
+			this.readRecords = state.f1;
 
-		this.stream.seek(this.splitStart + blockPos);
-		this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
-		this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
+			this.stream.seek(this.splitStart + blockPos);
+			this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
+			this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 3a77200..4cd200d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -144,7 +144,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 
 	private transient boolean end;
 
-	private transient long offset = -1;
+	private long offset = -1;
 
 	// --------------------------------------------------------------------------------------------
 	//  The configuration parameters. Configured on the instance and serialized to be shipped.
@@ -638,9 +638,15 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 	public void reopen(FileInputSplit split, Long state) throws IOException {
 		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
 		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+		Preconditions.checkArgument(state == -1 || state >= split.getStart(),
+			" Illegal offset "+ state +", smaller than the splits start=" + split.getStart());
+
+		try {
+			this.open(split);
+		} finally {
+			this.offset = state;
+		}
 
-		this.open(split);
-		this.offset = state;
 		if (state > this.splitStart + split.getLength()) {
 			this.end = true;
 		} else if (state > split.getStart()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index e26c534..9319338 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -188,6 +188,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 		private S restoredFormatState = null;
 
+		private volatile boolean isSplitOpen = false;
+
 		SplitReader(FileInputFormat<OT> format,
 					TypeSerializer<OT> serializer,
 					TimestampedCollector<OT> collector,
@@ -271,6 +273,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 							}
 							this.format.open(currentSplit);
 						}
+						this.isSplitOpen = true;
 					}
 
 					LOG.info("Reading split: " + currentSplit);
@@ -290,8 +293,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 					} finally {
 						// close and prepare for the next iteration
-						this.format.close();
-						this.currentSplit = null;
+						synchronized (checkpointLock) {
+							this.format.close();
+							this.isSplitOpen = false;
+							this.currentSplit = null;
+						}
 					}
 				}
 
@@ -303,8 +309,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			} finally {
 				synchronized (checkpointLock) {
 					LOG.info("Reader terminated, and exiting...");
+
 					this.format.closeInputFormat();
+					this.isSplitOpen = false;
+					this.currentSplit = null;
 					this.isRunning = false;
+
 					checkpointLock.notifyAll();
 				}
 			}
@@ -321,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				this.pendingSplits.remove();
 			}
 
-			if (this.format instanceof CheckpointableInputFormat) {
+			if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
 				S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
 				return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
 			} else {