You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/07 11:12:41 UTC
[1/3] incubator-flink git commit: [FLINK-1222] Tasks send close
acknowledgements early.
Repository: incubator-flink
Updated Branches:
refs/heads/master a959dd503 -> e58049711
[FLINK-1222] Tasks send close acknowledgements early.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ef406916
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ef406916
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ef406916
Branch: refs/heads/master
Commit: ef406916dbeabaef79b4ffd38fe5916cdd34bd2f
Parents: a959dd5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 15:14:12 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 15:14:12 2014 +0100
----------------------------------------------------------------------
.../io/network/channels/InputChannel.java | 34 +++++++++++++++-----
1 file changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ef406916/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
index 1d14172..80181be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
@@ -81,7 +81,9 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
*/
private long amountOfDataTransmitted;
- private volatile boolean brokerAggreedToCloseChannel;
+ private volatile boolean weClosedChannel;
+
+ private volatile boolean senderClosedChannel;
// -------------------------------------------------------------------------------------------
@@ -158,7 +160,12 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
AbstractEvent evt = boe.getEvent();
if (evt.getClass() == ChannelCloseEvent.class) {
- this.brokerAggreedToCloseChannel = true;
+ this.senderClosedChannel = true;
+ try {
+ close();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
return InputChannelResult.END_OF_STREAM;
}
else if (evt.getClass() == EndOfSuperstepEvent.class) {
@@ -207,11 +214,16 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
if (this.ioException != null) {
throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
} else {
- return this.brokerAggreedToCloseChannel;
+ return this.weClosedChannel && this.senderClosedChannel;
}
}
public void close() throws IOException, InterruptedException {
+
+ if (weClosedChannel) {
+ return;
+ }
+ weClosedChannel = true;
this.deserializer.clear();
if (this.dataBuffer != null) {
@@ -220,13 +232,13 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
}
// This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
- while (!this.brokerAggreedToCloseChannel)
+ while (!this.senderClosedChannel)
{
BufferOrEvent next = getNextBufferOrEvent();
if (next != null) {
if (next.isEvent()) {
if (next.getEvent() instanceof ChannelCloseEvent) {
- this.brokerAggreedToCloseChannel = true;
+ this.senderClosedChannel = true;
}
} else {
releasedConsumedReadBuffer(next.getBuffer());
@@ -266,8 +278,14 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
@Override
public void releaseAllResources() {
- this.brokerAggreedToCloseChannel = true;
+ this.senderClosedChannel = true;
this.deserializer.clear();
+
+ Buffer buf = this.dataBuffer;
+ if (buf != null) {
+ buf.recycleBuffer();
+ dataBuffer = null;
+ }
// The buffers are recycled by the input channel wrapper
}
@@ -310,8 +328,8 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
// notify that something (an exception) is available
notifyGateThatInputIsAvailable();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
+ " but received " + sequenceNumber);
}
[3/3] incubator-flink git commit: [FLINK-1215] Fix spurious failures
when creating output directories due to I/O races
Posted by se...@apache.org.
[FLINK-1215] Fix spurious failures when creating output directories due to I/O races
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e5804971
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e5804971
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e5804971
Branch: refs/heads/master
Commit: e58049711e4275d86197223b7efcb47d2f801244
Parents: ef9a373
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 20:33:38 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 20:36:46 2014 +0100
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 62 +++++++++++++++-----
1 file changed, 46 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5804971/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1b7b91e..cb3a751 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -517,24 +517,54 @@ public abstract class FileSystem {
if (createDirectory) {
// Output directory needs to be created
- try {
- if(!this.exists(outPath)) {
- this.mkdirs(outPath);
+
+ // NOTE: we sometimes see this code block fail due to a race:
+ // - the check whether the directory exists returns false
+ // - the call to create the directory fails (some concurrent thread is creating the directory)
+ // - the call to check whether the directory exists does not yet see the new directory
+
+ // try for 30 seconds
+ long now = System.currentTimeMillis();
+ long deadline = now + 30000;
+
+ do {
+ try {
+ if(!this.exists(outPath)) {
+ this.mkdirs(outPath);
+ }
}
- } catch(IOException ioe) {
- // Some other thread might already have created the directory.
- // If - for some other reason - the directory could not be created
- // and the path does not exist, this will be handled later.
- }
-
- // double check that the output directory exists
- try {
- FileStatus check = getFileStatus(outPath);
- return check.isDir();
- } catch (FileNotFoundException e) {
- return false;
+ catch (IOException ioe) {
+ // Some other thread might already have created the directory.
+ // If - for some other reason - the directory could not be created
+ // and the path does not exist, this will be handled later.
+ }
+
+ // double check that the output directory exists
+ try {
+ FileStatus check = getFileStatus(outPath);
+ if (check != null) {
+ if (check.isDir()) {
+ return true;
+ } else {
+ throw new IOException("FileSystem should create an output directory, but the path points to a file instead.");
+ }
+ }
+ // else: fall through the loop
+ }
+ catch (FileNotFoundException e) {
+ // fall though the loop
+ }
+
+ // delay to allow other threads to make progress in the I/O calls
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
}
- } else {
+ while (System.currentTimeMillis() < deadline);
+
+ return false;
+ }
+ else {
// check that the output path does not exist and an output file can be created by the output format.
return !this.exists(outPath);
}
[2/3] incubator-flink git commit: [runtime] In local mode,
make sure taskmanagers have completed registration before starting a
job.
Posted by se...@apache.org.
[runtime] In local mode, make sure taskmanagers have completed registration before starting a job.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ef9a3739
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ef9a3739
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ef9a3739
Branch: refs/heads/master
Commit: ef9a37390dfbd325b3bf2422334f99d22fca2a1a
Parents: ef40691
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 19:14:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 19:14:13 2014 +0100
----------------------------------------------------------------------
.../apache/flink/client/minicluster/NepheleMiniCluster.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ef9a3739/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index e04006c..aac0786 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -267,6 +267,15 @@ public class NepheleMiniCluster {
while (jobManager.getNumberOfSlotsAvailableToScheduler() < numSlots) {
Thread.sleep(50);
}
+
+ // make sure that not just the jobmanager has the slots, but also the taskmanager
+ // has figured out its registration. under rare races, calls can be scheduled before that otherwise
+ TaskManager[] tms = getTaskManagers();
+ for (TaskManager tm : tms) {
+ while (tm.getRegisteredId() == null) {
+ Thread.sleep(10);
+ }
+ }
}
private static void initializeIOFormatClasses() {