You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/21 14:13:32 UTC

[3/6] flink git commit: [FLINK-2834] Global round-robin for temporary directories

[FLINK-2834] Global round-robin for temporary directories

Multiple TaskManager filesystems can be used by configuring multiple temporary directories.
This patch changes the process of spilling files from a per-operator round-robin to a global
round-robin such that each directory is written to in turn across all operators, reducing
unbalanced I/O due to bunching.

This closes #1272


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd3264e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd3264e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd3264e6

Branch: refs/heads/master
Commit: dd3264e6360130c935267a13ff23db9ec3128d18
Parents: e3cabe7
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Oct 20 10:47:58 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 11:46:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/disk/iomanager/FileIOChannel.java    | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd3264e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index f9ee90c..fd8e8e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.util.StringUtils;
 
@@ -153,21 +154,26 @@ public interface FileIOChannel {
 	 */
 	public static final class Enumerator {
 
+		private static AtomicInteger globalCounter = new AtomicInteger();
+
 		private final File[] paths;
-		
+
 		private final String namePrefix;
 
-		private int counter;
+		private int localCounter;
 
 		protected Enumerator(File[] basePaths, Random random) {
 			this.paths = basePaths;
 			this.namePrefix = ID.randomString(random);
-			this.counter = 0;
+			this.localCounter = 0;
 		}
 
 		public ID next() {
-			int threadNum = counter % paths.length;
-			String filename = String.format("%s.%06d.channel", namePrefix, (counter++));
+			// The local counter is used to increment file names while the global counter is used
+			// for indexing the directory and associated read and write threads. This performs a
+			// round-robin among all spilling operators and avoids I/O bunching.
+			int threadNum = globalCounter.getAndIncrement() % paths.length;
+			String filename = String.format("%s.%06d.channel", namePrefix, (localCounter++));
 			return new ID(new File(paths[threadNum], filename), threadNum);
 		}
 	}