You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/21 14:41:49 UTC
flink git commit: [FLINK-2834] Global round-robin for temporary
directories
Repository: flink
Updated Branches:
refs/heads/release-0.9 b3429ed55 -> 96099951e
[FLINK-2834] Global round-robin for temporary directories
Multiple TaskManager filesystems can be used by configuring multiple temporary directories.
This patch changes the process of spilling files from a per-operator round-robin to a global
round-robin such that each directory is written to in turn across all operators, reducing
unbalanced I/O due to bunching.
This closes #1272
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96099951
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96099951
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96099951
Branch: refs/heads/release-0.9
Commit: 96099951e031adf37a5dba6d1c46c9eb6d43a52a
Parents: b3429ed
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Oct 20 10:47:58 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 14:41:08 2015 +0200
----------------------------------------------------------------------
.../runtime/io/disk/iomanager/FileIOChannel.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/96099951/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index f9ee90c..fd8e8e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.util.StringUtils;
@@ -153,21 +154,26 @@ public interface FileIOChannel {
*/
public static final class Enumerator {
+ private static AtomicInteger globalCounter = new AtomicInteger();
+
private final File[] paths;
-
+
private final String namePrefix;
- private int counter;
+ private int localCounter;
protected Enumerator(File[] basePaths, Random random) {
this.paths = basePaths;
this.namePrefix = ID.randomString(random);
- this.counter = 0;
+ this.localCounter = 0;
}
public ID next() {
- int threadNum = counter % paths.length;
- String filename = String.format("%s.%06d.channel", namePrefix, (counter++));
+ // The local counter is used to increment file names while the global counter is used
+ // for indexing the directory and associated read and write threads. This performs a
+ // round-robin among all spilling operators and avoids I/O bunching.
+ int threadNum = globalCounter.getAndIncrement() % paths.length;
+ String filename = String.format("%s.%06d.channel", namePrefix, (localCounter++));
return new ID(new File(paths[threadNum], filename), threadNum);
}
}