You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:05:59 UTC

[flink] 04/06: [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9acd2ff317b4a6181e85ba50ddbe177573351d7
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Mon Jul 1 23:31:30 2019 +0800

    [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager
    
    IOManager mainly has two roles. One is for managing file channels based on config temp dirs, and the other is for abstracting ways to read/writer files.
    We could define a FileChannelManager class for handing the file channels which could be reused for shuffle environment future. To do so the shuffle
    environment do not need to rely on the whole IOManager.
---
 .../flink/runtime/io/disk/FileChannelManager.java  |  45 ++++++++
 .../runtime/io/disk/FileChannelManagerImpl.java    | 126 +++++++++++++++++++++
 .../flink/runtime/io/disk/iomanager/IOManager.java | 119 ++++++-------------
 3 files changed, 203 insertions(+), 87 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
new file mode 100644
index 0000000..22079db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * The manager used for creating/getting file IO channels based on config temp dirs.
+ */
+public interface FileChannelManager extends AutoCloseable {
+
+	/**
+	 * Creates an ID identifying an underlying file channel and returns it.
+	 */
+	ID createChannel();
+
+	/**
+	 * Creates an enumerator for channels that logically belong together and returns it.
+	 */
+	Enumerator createChannelEnumerator();
+
+	/**
+	 * Gets all the files corresponding to the config temp dirs.
+	 */
+	File[] getPaths();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
new file mode 100644
index 0000000..2bdb8d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The manager used for creating/deleting file channels based on config temp dirs.
+ */
+public class FileChannelManagerImpl implements FileChannelManager {
+	private static final Logger LOG = LoggerFactory.getLogger(FileChannelManagerImpl.class);
+
+	/** The temporary directories for files. */
+	private final File[] paths;
+
+	/** A random number generator for the anonymous Channel IDs. */
+	private final Random random;
+
+	/** The number of the next path to use. */
+	private volatile int nextPath;
+
+	public FileChannelManagerImpl(String[] tempDirs, String prefix) {
+		checkNotNull(tempDirs, "The temporary directories must not be null.");
+		checkArgument(tempDirs.length > 0, "The temporary directories must not be empty.");
+
+		this.random = new Random();
+		this.nextPath = 0;
+		this.paths = createFiles(tempDirs, prefix);
+	}
+
+	private static File[] createFiles(String[] tempDirs, String prefix) {
+		File[] files = new File[tempDirs.length];
+		for (int i = 0; i < tempDirs.length; i++) {
+			File baseDir = new File(tempDirs[i]);
+			String subfolder = String.format("flink-%s-%s", prefix, UUID.randomUUID().toString());
+			File storageDir = new File(baseDir, subfolder);
+
+			if (!storageDir.exists() && !storageDir.mkdirs()) {
+				throw new RuntimeException(
+					"Could not create storage directory for FileChannelManager: " + storageDir.getAbsolutePath());
+			}
+			files[i] = storageDir;
+
+			LOG.info("FileChannelManager uses directory {} for spill files.", storageDir.getAbsolutePath());
+		}
+		return files;
+	}
+
+	@Override
+	public ID createChannel() {
+		int num = getNextPathNum();
+		return new ID(paths[num], num, random);
+	}
+
+	@Override
+	public Enumerator createChannelEnumerator() {
+		return new Enumerator(paths, random);
+	}
+
+	@Override
+	public File[] getPaths() {
+		return Arrays.copyOf(paths, paths.length);
+	}
+
+	/**
+	 * Remove all the temp directories.
+	 */
+	@Override
+	public void close() throws Exception {
+		IOUtils.closeAll(Arrays.stream(paths)
+			.filter(File::exists)
+			.map(FileChannelManagerImpl::getFileCloser)
+			.collect(Collectors.toList()));
+	}
+
+	private static AutoCloseable getFileCloser(File path) {
+		return () -> {
+			try {
+				FileUtils.deleteDirectory(path);
+				LOG.info("FileChannelManager removed spill file directory {}", path.getAbsolutePath());
+			} catch (IOException e) {
+				String errorMessage = String.format("FileChannelManager failed to properly clean up temp file directory: %s", path);
+				throw new IOException(errorMessage, e);
+			}
+		};
+	}
+
+	private int getNextPathNum() {
+		int next = nextPath;
+		int newNext = next + 1;
+		nextPath = newNext >= paths.length ? 0 : newNext;
+		return next;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index a649e42..1be8639 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,24 +19,20 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
 
 /**
  * The facade for the provided I/O manager services.
@@ -44,14 +40,9 @@ import java.util.stream.Collectors;
 public abstract class IOManager implements AutoCloseable {
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
-	/** The temporary directories for files. */
-	private final File[] paths;
+	private static final String DIR_NAME_PREFIX = "io";
 
-	/** A random number generator for the anonymous ChannelIDs. */
-	private final Random random;
-
-	/** The number of the next path to use. */
-	private volatile int nextPath;
+	private final FileChannelManager fileChannelManager;
 
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
@@ -63,26 +54,7 @@ public abstract class IOManager implements AutoCloseable {
 	 * @param tempDirs The basic directories for files underlying anonymous channels.
 	 */
 	protected IOManager(String[] tempDirs) {
-		if (tempDirs == null || tempDirs.length == 0) {
-			throw new IllegalArgumentException("The temporary directories must not be null or empty.");
-		}
-
-		this.random = new Random();
-		this.nextPath = 0;
-
-		this.paths = new File[tempDirs.length];
-		for (int i = 0; i < tempDirs.length; i++) {
-			File baseDir = new File(tempDirs[i]);
-			String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString());
-			File storageDir = new File(baseDir, subfolder);
-
-			if (!storageDir.exists() && !storageDir.mkdirs()) {
-				throw new RuntimeException(
-						"Could not create storage directory for IOManager: " + storageDir.getAbsolutePath());
-			}
-			paths[i] = storageDir;
-			LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath());
-		}
+		this.fileChannelManager = new FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
 	}
 
 	/**
@@ -90,22 +62,7 @@ public abstract class IOManager implements AutoCloseable {
 	 */
 	@Override
 	public void close() throws Exception {
-		IOUtils.closeAll(Arrays.stream(paths)
-			.filter(File::exists)
-			.map(IOManager::getFileCloser)
-			.collect(Collectors.toList()));
-	}
-
-	private static AutoCloseable getFileCloser(File path) {
-		return () -> {
-			try {
-				FileUtils.deleteDirectory(path);
-				LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
-			} catch (IOException e) {
-				String errorMessage = String.format("IOManager failed to properly clean up temp file directory: %s", path);
-				throw new IOException(errorMessage, e);
-			}
-		};
+		fileChannelManager.close();
 	}
 
 	// ------------------------------------------------------------------------
@@ -119,8 +76,7 @@ public abstract class IOManager implements AutoCloseable {
 	 * @return A channel to a temporary directory.
 	 */
 	public ID createChannel() {
-		final int num = getNextPathNum();
-		return new ID(this.paths[num], num, this.random);
+		return fileChannelManager.createChannel();
 	}
 
 	/**
@@ -130,7 +86,7 @@ public abstract class IOManager implements AutoCloseable {
 	 * @return An enumerator for channels.
 	 */
 	public Enumerator createChannelEnumerator() {
-		return new Enumerator(this.paths, this.random);
+		return fileChannelManager.createChannelEnumerator();
 	}
 
 	/**
@@ -147,6 +103,29 @@ public abstract class IOManager implements AutoCloseable {
 		}
 	}
 
+	/**
+	 * Gets the directories that the I/O manager spills to.
+	 *
+	 * @return The directories that the I/O manager spills to.
+	 */
+	public File[] getSpillingDirectories() {
+		return fileChannelManager.getPaths();
+	}
+
+	/**
+	 * Gets the directories that the I/O manager spills to, as path strings.
+	 *
+	 * @return The directories that the I/O manager spills to, as path strings.
+	 */
+	public String[] getSpillingDirectoriesPaths() {
+		File[] paths = fileChannelManager.getPaths();
+		String[] strings = new String[paths.length];
+		for (int i = 0; i < strings.length; i++) {
+			strings[i] = paths[i].getAbsolutePath();
+		}
+		return strings;
+	}
+
 	// ------------------------------------------------------------------------
 	//                        Reader / Writer instantiations
 	// ------------------------------------------------------------------------
@@ -245,38 +224,4 @@ public abstract class IOManager implements AutoCloseable {
 		ID channelID,
 		List<MemorySegment> targetSegments,
 		int numBlocks) throws IOException;
-
-
-	// ------------------------------------------------------------------------
-	//                          Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the directories that the I/O manager spills to.
-	 *
-	 * @return The directories that the I/O manager spills to.
-	 */
-	public File[] getSpillingDirectories() {
-		return this.paths;
-	}
-
-	/**
-	 * Gets the directories that the I/O manager spills to, as path strings.
-	 *
-	 * @return The directories that the I/O manager spills to, as path strings.
-	 */
-	public String[] getSpillingDirectoriesPaths() {
-		String[] strings = new String[this.paths.length];
-		for (int i = 0; i < strings.length; i++) {
-			strings[i] = paths[i].getAbsolutePath();
-		}
-		return strings;
-	}
-
-	private int getNextPathNum() {
-		final int next = this.nextPath;
-		final int newNext = next + 1;
-		this.nextPath = newNext >= this.paths.length ? 0 : newNext;
-		return next;
-	}
 }