You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/07 11:12:41 UTC

[1/3] incubator-flink git commit: [FLINK-1222] Tasks send close acknowledgements early.

Repository: incubator-flink
Updated Branches:
  refs/heads/master a959dd503 -> e58049711


[FLINK-1222] Tasks send close acknowledgements early.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ef406916
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ef406916
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ef406916

Branch: refs/heads/master
Commit: ef406916dbeabaef79b4ffd38fe5916cdd34bd2f
Parents: a959dd5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 15:14:12 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 15:14:12 2014 +0100

----------------------------------------------------------------------
 .../io/network/channels/InputChannel.java       | 34 +++++++++++++++-----
 1 file changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ef406916/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
index 1d14172..80181be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
@@ -81,7 +81,9 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 	 */
 	private long amountOfDataTransmitted;
 
-	private volatile boolean brokerAggreedToCloseChannel;
+	private volatile boolean weClosedChannel;
+	
+	private volatile boolean senderClosedChannel;
 
 	// -------------------------------------------------------------------------------------------
 
@@ -158,7 +160,12 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 
 				AbstractEvent evt = boe.getEvent();
 				if (evt.getClass() == ChannelCloseEvent.class) {
-					this.brokerAggreedToCloseChannel = true;
+					this.senderClosedChannel = true;
+					try {
+						close();
+					} catch (InterruptedException e) {
+						throw new IOException(e);
+					}
 					return InputChannelResult.END_OF_STREAM;
 				}
 				else if (evt.getClass() == EndOfSuperstepEvent.class) {
@@ -207,11 +214,16 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 		if (this.ioException != null) {
 			throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
 		} else {
-			return this.brokerAggreedToCloseChannel;
+			return this.weClosedChannel && this.senderClosedChannel;
 		}
 	}
 
 	public void close() throws IOException, InterruptedException {
+		
+		if (weClosedChannel) {
+			return;
+		}
+		weClosedChannel = true;
 
 		this.deserializer.clear();
 		if (this.dataBuffer != null) {
@@ -220,13 +232,13 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 		}
 
 		// This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
-		while (!this.brokerAggreedToCloseChannel)
+		while (!this.senderClosedChannel)
 		{
 			BufferOrEvent next = getNextBufferOrEvent();
 			if (next != null) {
 				if (next.isEvent()) {
 					if (next.getEvent() instanceof ChannelCloseEvent) {
-						this.brokerAggreedToCloseChannel = true;
+						this.senderClosedChannel = true;
 					}
 				} else {
 					releasedConsumedReadBuffer(next.getBuffer());
@@ -266,8 +278,14 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 
 	@Override
 	public void releaseAllResources() {
-		this.brokerAggreedToCloseChannel = true;
+		this.senderClosedChannel = true;
 		this.deserializer.clear();
+		
+		Buffer buf = this.dataBuffer;
+		if (buf != null) {
+			buf.recycleBuffer();
+			dataBuffer = null;
+		}
 
 		// The buffers are recycled by the input channel wrapper
 	}
@@ -310,8 +328,8 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 				// notify that something (an exception) is available
 				notifyGateThatInputIsAvailable();
 
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
 							+ " but received " + sequenceNumber);
 				}
 


[3/3] incubator-flink git commit: [FLINK-1215] Fix spurious failures when creating output directories due to I/O races

Posted by se...@apache.org.
[FLINK-1215] Fix spurious failures when creating output directories due to I/O races


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e5804971
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e5804971
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e5804971

Branch: refs/heads/master
Commit: e58049711e4275d86197223b7efcb47d2f801244
Parents: ef9a373
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 20:33:38 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 20:36:46 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 62 +++++++++++++++-----
 1 file changed, 46 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5804971/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1b7b91e..cb3a751 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -517,24 +517,54 @@ public abstract class FileSystem {
 		
 		if (createDirectory) {
 			// Output directory needs to be created
-			try {
-				if(!this.exists(outPath)) {
-					this.mkdirs(outPath);
+			
+			// NOTE: we sometimes see this code block fail due to a race:
+			// - the check whether the directory exists returns false
+			// - the call to create the directory fails (some concurrent thread is creating the directory)
+			// - the call to check whether the directory exists does not yet see the new directory
+			
+			// try for 30 seconds
+			long now = System.currentTimeMillis();
+			long deadline = now + 30000;
+			
+			do {
+				try {
+					if(!this.exists(outPath)) {
+						this.mkdirs(outPath);
+					}
 				}
-			} catch(IOException ioe) {
-				// Some other thread might already have created the directory.
-				// If - for some other reason - the directory could not be created  
-				// and the path does not exist, this will be handled later.
-			}
-	
-			// double check that the output directory exists
-			try {
-				FileStatus check = getFileStatus(outPath);
-				return check.isDir();
-			} catch (FileNotFoundException e) {
-				return false;
+				catch (IOException ioe) {
+					// Some other thread might already have created the directory.
+					// If - for some other reason - the directory could not be created  
+					// and the path does not exist, this will be handled later.
+				}
+		
+				// double check that the output directory exists
+				try {
+					FileStatus check = getFileStatus(outPath);
+					if (check != null) {
+						if (check.isDir()) {
+							return true;
+						} else {
+							throw new IOException("FileSystem should create an output directory, but the path points to a file instead.");
+						}
+					}
+					// else: fall through the loop
+				}
+				catch (FileNotFoundException e) {
+					// fall though the loop
+				}
+				
+				// delay to allow other threads to make progress in the I/O calls
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException ie) {}
 			}
-		} else {
+			while (System.currentTimeMillis() < deadline);
+			
+			return false;
+		}
+		else {
 			// check that the output path does not exist and an output file can be created by the output format.
 			return !this.exists(outPath);
 		}


[2/3] incubator-flink git commit: [runtime] In local mode, make sure taskmanagers have completed registration before starting a job.

Posted by se...@apache.org.
[runtime] In local mode, make sure taskmanagers have completed registration before starting a job.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ef9a3739
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ef9a3739
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ef9a3739

Branch: refs/heads/master
Commit: ef9a37390dfbd325b3bf2422334f99d22fca2a1a
Parents: ef40691
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 19:14:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 19:14:13 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/client/minicluster/NepheleMiniCluster.java | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ef9a3739/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index e04006c..aac0786 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -267,6 +267,15 @@ public class NepheleMiniCluster {
 		while (jobManager.getNumberOfSlotsAvailableToScheduler() < numSlots) {
 			Thread.sleep(50);
 		}
+		
+		// make sure that not just the jobmanager has the slots, but also the taskmanager
+		// has figured out its registration. under rare races, calls can be scheduled before that otherwise
+		TaskManager[] tms = getTaskManagers();
+		for (TaskManager tm : tms) {
+			while (tm.getRegisteredId() == null) {
+				Thread.sleep(10);
+			}
+		}
 	}
 	
 	private static void initializeIOFormatClasses() {