You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:05:59 UTC
[flink] 04/06: [FLINK-12735][network] Refactor IOManager to
introduce FileChannelManager
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f9acd2ff317b4a6181e85ba50ddbe177573351d7
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Mon Jul 1 23:31:30 2019 +0800
[FLINK-12735][network] Refactor IOManager to introduce FileChannelManager
IOManager mainly has two roles. One is for managing file channels based on config temp dirs, and the other is for abstracting ways to read/writer files.
We could define a FileChannelManager class for handing the file channels which could be reused for shuffle environment future. To do so the shuffle
environment do not need to rely on the whole IOManager.
---
.../flink/runtime/io/disk/FileChannelManager.java | 45 ++++++++
.../runtime/io/disk/FileChannelManagerImpl.java | 126 +++++++++++++++++++++
.../flink/runtime/io/disk/iomanager/IOManager.java | 119 ++++++-------------
3 files changed, 203 insertions(+), 87 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
new file mode 100644
index 0000000..22079db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * The manager used for creating/getting file IO channels based on config temp dirs.
+ */
+public interface FileChannelManager extends AutoCloseable {
+
+ /**
+ * Creates an ID identifying an underlying file channel and returns it.
+ */
+ ID createChannel();
+
+ /**
+ * Creates an enumerator for channels that logically belong together and returns it.
+ */
+ Enumerator createChannelEnumerator();
+
+ /**
+ * Gets all the files corresponding to the config temp dirs.
+ */
+ File[] getPaths();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
new file mode 100644
index 0000000..2bdb8d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The manager used for creating/deleting file channels based on config temp dirs.
+ */
+public class FileChannelManagerImpl implements FileChannelManager {
+ private static final Logger LOG = LoggerFactory.getLogger(FileChannelManagerImpl.class);
+
+ /** The temporary directories for files. */
+ private final File[] paths;
+
+ /** A random number generator for the anonymous Channel IDs. */
+ private final Random random;
+
+ /** The number of the next path to use. */
+ private volatile int nextPath;
+
+ public FileChannelManagerImpl(String[] tempDirs, String prefix) {
+ checkNotNull(tempDirs, "The temporary directories must not be null.");
+ checkArgument(tempDirs.length > 0, "The temporary directories must not be empty.");
+
+ this.random = new Random();
+ this.nextPath = 0;
+ this.paths = createFiles(tempDirs, prefix);
+ }
+
+ private static File[] createFiles(String[] tempDirs, String prefix) {
+ File[] files = new File[tempDirs.length];
+ for (int i = 0; i < tempDirs.length; i++) {
+ File baseDir = new File(tempDirs[i]);
+ String subfolder = String.format("flink-%s-%s", prefix, UUID.randomUUID().toString());
+ File storageDir = new File(baseDir, subfolder);
+
+ if (!storageDir.exists() && !storageDir.mkdirs()) {
+ throw new RuntimeException(
+ "Could not create storage directory for FileChannelManager: " + storageDir.getAbsolutePath());
+ }
+ files[i] = storageDir;
+
+ LOG.info("FileChannelManager uses directory {} for spill files.", storageDir.getAbsolutePath());
+ }
+ return files;
+ }
+
+ @Override
+ public ID createChannel() {
+ int num = getNextPathNum();
+ return new ID(paths[num], num, random);
+ }
+
+ @Override
+ public Enumerator createChannelEnumerator() {
+ return new Enumerator(paths, random);
+ }
+
+ @Override
+ public File[] getPaths() {
+ return Arrays.copyOf(paths, paths.length);
+ }
+
+ /**
+ * Remove all the temp directories.
+ */
+ @Override
+ public void close() throws Exception {
+ IOUtils.closeAll(Arrays.stream(paths)
+ .filter(File::exists)
+ .map(FileChannelManagerImpl::getFileCloser)
+ .collect(Collectors.toList()));
+ }
+
+ private static AutoCloseable getFileCloser(File path) {
+ return () -> {
+ try {
+ FileUtils.deleteDirectory(path);
+ LOG.info("FileChannelManager removed spill file directory {}", path.getAbsolutePath());
+ } catch (IOException e) {
+ String errorMessage = String.format("FileChannelManager failed to properly clean up temp file directory: %s", path);
+ throw new IOException(errorMessage, e);
+ }
+ };
+ }
+
+ private int getNextPathNum() {
+ int next = nextPath;
+ int newNext = next + 1;
+ nextPath = newNext >= paths.length ? 0 : newNext;
+ return next;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index a649e42..1be8639 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,24 +19,20 @@
package org.apache.flink.runtime.io.disk.iomanager;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
/**
* The facade for the provided I/O manager services.
@@ -44,14 +40,9 @@ import java.util.stream.Collectors;
public abstract class IOManager implements AutoCloseable {
protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
- /** The temporary directories for files. */
- private final File[] paths;
+ private static final String DIR_NAME_PREFIX = "io";
- /** A random number generator for the anonymous ChannelIDs. */
- private final Random random;
-
- /** The number of the next path to use. */
- private volatile int nextPath;
+ private final FileChannelManager fileChannelManager;
// -------------------------------------------------------------------------
// Constructors / Destructors
@@ -63,26 +54,7 @@ public abstract class IOManager implements AutoCloseable {
* @param tempDirs The basic directories for files underlying anonymous channels.
*/
protected IOManager(String[] tempDirs) {
- if (tempDirs == null || tempDirs.length == 0) {
- throw new IllegalArgumentException("The temporary directories must not be null or empty.");
- }
-
- this.random = new Random();
- this.nextPath = 0;
-
- this.paths = new File[tempDirs.length];
- for (int i = 0; i < tempDirs.length; i++) {
- File baseDir = new File(tempDirs[i]);
- String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString());
- File storageDir = new File(baseDir, subfolder);
-
- if (!storageDir.exists() && !storageDir.mkdirs()) {
- throw new RuntimeException(
- "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath());
- }
- paths[i] = storageDir;
- LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath());
- }
+ this.fileChannelManager = new FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
}
/**
@@ -90,22 +62,7 @@ public abstract class IOManager implements AutoCloseable {
*/
@Override
public void close() throws Exception {
- IOUtils.closeAll(Arrays.stream(paths)
- .filter(File::exists)
- .map(IOManager::getFileCloser)
- .collect(Collectors.toList()));
- }
-
- private static AutoCloseable getFileCloser(File path) {
- return () -> {
- try {
- FileUtils.deleteDirectory(path);
- LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
- } catch (IOException e) {
- String errorMessage = String.format("IOManager failed to properly clean up temp file directory: %s", path);
- throw new IOException(errorMessage, e);
- }
- };
+ fileChannelManager.close();
}
// ------------------------------------------------------------------------
@@ -119,8 +76,7 @@ public abstract class IOManager implements AutoCloseable {
* @return A channel to a temporary directory.
*/
public ID createChannel() {
- final int num = getNextPathNum();
- return new ID(this.paths[num], num, this.random);
+ return fileChannelManager.createChannel();
}
/**
@@ -130,7 +86,7 @@ public abstract class IOManager implements AutoCloseable {
* @return An enumerator for channels.
*/
public Enumerator createChannelEnumerator() {
- return new Enumerator(this.paths, this.random);
+ return fileChannelManager.createChannelEnumerator();
}
/**
@@ -147,6 +103,29 @@ public abstract class IOManager implements AutoCloseable {
}
}
+ /**
+ * Gets the directories that the I/O manager spills to.
+ *
+ * @return The directories that the I/O manager spills to.
+ */
+ public File[] getSpillingDirectories() {
+ return fileChannelManager.getPaths();
+ }
+
+ /**
+ * Gets the directories that the I/O manager spills to, as path strings.
+ *
+ * @return The directories that the I/O manager spills to, as path strings.
+ */
+ public String[] getSpillingDirectoriesPaths() {
+ File[] paths = fileChannelManager.getPaths();
+ String[] strings = new String[paths.length];
+ for (int i = 0; i < strings.length; i++) {
+ strings[i] = paths[i].getAbsolutePath();
+ }
+ return strings;
+ }
+
// ------------------------------------------------------------------------
// Reader / Writer instantiations
// ------------------------------------------------------------------------
@@ -245,38 +224,4 @@ public abstract class IOManager implements AutoCloseable {
ID channelID,
List<MemorySegment> targetSegments,
int numBlocks) throws IOException;
-
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Gets the directories that the I/O manager spills to.
- *
- * @return The directories that the I/O manager spills to.
- */
- public File[] getSpillingDirectories() {
- return this.paths;
- }
-
- /**
- * Gets the directories that the I/O manager spills to, as path strings.
- *
- * @return The directories that the I/O manager spills to, as path strings.
- */
- public String[] getSpillingDirectoriesPaths() {
- String[] strings = new String[this.paths.length];
- for (int i = 0; i < strings.length; i++) {
- strings[i] = paths[i].getAbsolutePath();
- }
- return strings;
- }
-
- private int getNextPathNum() {
- final int next = this.nextPath;
- final int newNext = next + 1;
- this.nextPath = newNext >= this.paths.length ? 0 : newNext;
- return next;
- }
}