You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/21 14:13:32 UTC
[3/6] flink git commit: [FLINK-2834] Global round-robin for temporary
directories
[FLINK-2834] Global round-robin for temporary directories
Multiple TaskManager filesystems can be used by configuring multiple temporary directories.
This patch changes the process of spilling files from a per-operator round-robin to a global
round-robin such that each directory is written to in turn across all operators, reducing
unbalanced I/O due to bunching.
This closes #1272
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd3264e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd3264e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd3264e6
Branch: refs/heads/master
Commit: dd3264e6360130c935267a13ff23db9ec3128d18
Parents: e3cabe7
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Oct 20 10:47:58 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 11:46:06 2015 +0200
----------------------------------------------------------------------
.../runtime/io/disk/iomanager/FileIOChannel.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dd3264e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index f9ee90c..fd8e8e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.util.StringUtils;
@@ -153,21 +154,26 @@ public interface FileIOChannel {
*/
public static final class Enumerator {
+ private static AtomicInteger globalCounter = new AtomicInteger();
+
private final File[] paths;
-
+
private final String namePrefix;
- private int counter;
+ private int localCounter;
protected Enumerator(File[] basePaths, Random random) {
this.paths = basePaths;
this.namePrefix = ID.randomString(random);
- this.counter = 0;
+ this.localCounter = 0;
}
public ID next() {
- int threadNum = counter % paths.length;
- String filename = String.format("%s.%06d.channel", namePrefix, (counter++));
+ // The local counter is used to increment file names while the global counter is used
+ // for indexing the directory and associated read and write threads. This performs a
+ // round-robin among all spilling operators and avoids I/O bunching.
+ int threadNum = globalCounter.getAndIncrement() % paths.length;
+ String filename = String.format("%s.%06d.channel", namePrefix, (localCounter++));
return new ID(new File(paths[threadNum], filename), threadNum);
}
}