You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:05:55 UTC

[flink] branch master updated (95944e2 -> a0f747d)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 95944e2  [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor
     new cdbfb82  [hotfix][runtime] Cleanup IOManager code
     new 5809ed3  [hotfix][runtime] IOManager implements AutoCloseable
     new 423e8a8  [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown
     new f9acd2f  [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager
     new 80003d6  [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
     new a0f747d  [hotfix][runtime] Remove legacy NoOpIOManager class

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/io/disk/FileChannelManager.java  |  37 ++--
 .../runtime/io/disk/FileChannelManagerImpl.java    | 126 +++++++++++++
 .../runtime/io/disk/iomanager/FileIOChannel.java   |  78 ++++----
 .../flink/runtime/io/disk/iomanager/IOManager.java | 206 +++++++--------------
 .../runtime/io/disk/iomanager/IOManagerAsync.java  | 107 ++++++-----
 .../io/network/NettyShuffleEnvironment.java        |  13 ++
 .../io/network/NettyShuffleServiceFactory.java     |  17 +-
 .../network/partition/ResultPartitionFactory.java  |  16 +-
 .../runtime/shuffle/ShuffleEnvironmentContext.java |  10 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  11 +-
 .../NettyShuffleEnvironmentConfiguration.java      |  23 ++-
 .../flink/runtime/io/disk/ChannelViewsTest.java    |   7 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |   5 +-
 .../runtime/io/disk/FileChannelStreamsTest.java    |  12 +-
 .../runtime/io/disk/NoOpFileChannelManager.java}   |  33 ++--
 .../io/disk/SeekableFileChannelInputViewTest.java  |   6 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   7 +-
 .../AsynchronousBufferFileWriterTest.java          |   4 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   |  21 +--
 .../BufferFileWriterFileSegmentReaderTest.java     |   4 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |   4 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java      |   7 +-
 .../IOManagerAsyncWithNoOpBufferFileWriter.java    |  53 ------
 .../runtime/io/disk/iomanager/IOManagerITCase.java |   3 +-
 .../runtime/io/disk/iomanager/IOManagerTest.java   |  35 ++--
 .../runtime/io/disk/iomanager/NoOpIOManager.java   |  73 --------
 .../io/network/NettyShuffleEnvironmentBuilder.java |  17 +-
 .../io/network/NettyShuffleEnvironmentTest.java    |  23 ++-
 .../partition/BoundedBlockingSubpartitionTest.java |  24 ++-
 .../BoundedBlockingSubpartitionWriteReadTest.java  |  21 ++-
 .../io/network/partition/PartitionTestUtils.java   |  21 +++
 .../network/partition/ResultPartitionBuilder.java  |  12 +-
 .../partition/ResultPartitionFactoryTest.java      |  22 ++-
 .../io/network/partition/ResultPartitionTest.java  |  27 ++-
 .../runtime/operators/hash/HashTableITCase.java    |   7 +-
 .../hash/HashTablePerformanceComparison.java       |   5 +-
 .../runtime/operators/hash/HashTableTest.java      |  21 +--
 .../hash/NonReusingHashJoinIteratorITCase.java     |   7 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |   7 +-
 .../hash/ReOpenableHashTableTestBase.java          |   7 +-
 .../hash/ReusingHashJoinIteratorITCase.java        |   7 +-
 .../resettable/SpillingResettableIteratorTest.java |   7 +-
 ...pillingResettableMutableObjectIteratorTest.java |   7 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |   7 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |   7 +-
 .../runtime/operators/sort/ExternalSortITCase.java |   7 +-
 .../sort/ExternalSortLargeRecordsITCase.java       |   7 +-
 .../sort/FixedLengthRecordSorterTest.java          |   4 +-
 .../operators/sort/LargeRecordHandlerITCase.java   |  42 ++---
 .../operators/sort/LargeRecordHandlerTest.java     |  21 +--
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |   7 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |   7 +-
 .../operators/sort/UnilateralSortMergerTest.java   |   4 +-
 .../testutils/BinaryOperatorTestBase.java          |   5 +-
 .../operators/testutils/DriverTestBase.java        |   3 +-
 .../operators/testutils/MockEnvironment.java       |   6 +-
 .../runtime/operators/testutils/TaskTestBase.java  |   2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |   3 +-
 .../operators/util/HashVsSortMiniBenchmark.java    |   7 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |   1 -
 .../streaming/runtime/io/BufferSpillerTest.java    |   4 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |   4 +-
 .../CheckpointBarrierAlignerMassiveRandomTest.java |   7 +-
 .../io/SpillingCheckpointBarrierAlignerTest.java   |   4 +-
 .../StreamNetworkBenchmarkEnvironment.java         |   7 -
 .../streaming/runtime/tasks/OperatorChainTest.java |   2 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |   3 +-
 .../util/AbstractStreamOperatorTestHarness.java    |   4 +-
 .../flink/table/runtime/aggregate/HashAggTest.java |   7 +-
 .../runtime/hashtable/BinaryHashTableTest.java     |   7 +-
 .../io/CompressedHeaderlessChannelTest.java        |   8 +-
 .../join/Int2SortMergeJoinOperatorTest.java        |   7 +-
 .../runtime/sort/BinaryExternalSorterTest.java     |   7 +-
 .../runtime/sort/BufferedKVExternalSorterTest.java |   7 +-
 .../manual/HashTableRecordWidthCombinations.java   |   7 +-
 .../flink/test/manual/MassiveStringSorting.java    |  14 +-
 .../test/manual/MassiveStringValueSorting.java     |  14 +-
 77 files changed, 640 insertions(+), 773 deletions(-)
 copy flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java => flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java (56%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/blob/VoidBlobStore.java => test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java} (58%)
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java


[flink] 04/06: [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9acd2ff317b4a6181e85ba50ddbe177573351d7
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Mon Jul 1 23:31:30 2019 +0800

    [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager
    
    IOManager mainly has two roles. One is for managing file channels based on config temp dirs, and the other is for abstracting ways to read/writer files.
    We could define a FileChannelManager class for handing the file channels which could be reused for shuffle environment future. To do so the shuffle
    environment do not need to rely on the whole IOManager.
---
 .../flink/runtime/io/disk/FileChannelManager.java  |  45 ++++++++
 .../runtime/io/disk/FileChannelManagerImpl.java    | 126 +++++++++++++++++++++
 .../flink/runtime/io/disk/iomanager/IOManager.java | 119 ++++++-------------
 3 files changed, 203 insertions(+), 87 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
new file mode 100644
index 0000000..22079db
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * The manager used for creating/getting file IO channels based on config temp dirs.
+ */
+public interface FileChannelManager extends AutoCloseable {
+
+	/**
+	 * Creates an ID identifying an underlying file channel and returns it.
+	 */
+	ID createChannel();
+
+	/**
+	 * Creates an enumerator for channels that logically belong together and returns it.
+	 */
+	Enumerator createChannelEnumerator();
+
+	/**
+	 * Gets all the files corresponding to the config temp dirs.
+	 */
+	File[] getPaths();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
new file mode 100644
index 0000000..2bdb8d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The manager used for creating/deleting file channels based on config temp dirs.
+ */
+public class FileChannelManagerImpl implements FileChannelManager {
+	private static final Logger LOG = LoggerFactory.getLogger(FileChannelManagerImpl.class);
+
+	/** The temporary directories for files. */
+	private final File[] paths;
+
+	/** A random number generator for the anonymous Channel IDs. */
+	private final Random random;
+
+	/** The number of the next path to use. */
+	private volatile int nextPath;
+
+	public FileChannelManagerImpl(String[] tempDirs, String prefix) {
+		checkNotNull(tempDirs, "The temporary directories must not be null.");
+		checkArgument(tempDirs.length > 0, "The temporary directories must not be empty.");
+
+		this.random = new Random();
+		this.nextPath = 0;
+		this.paths = createFiles(tempDirs, prefix);
+	}
+
+	private static File[] createFiles(String[] tempDirs, String prefix) {
+		File[] files = new File[tempDirs.length];
+		for (int i = 0; i < tempDirs.length; i++) {
+			File baseDir = new File(tempDirs[i]);
+			String subfolder = String.format("flink-%s-%s", prefix, UUID.randomUUID().toString());
+			File storageDir = new File(baseDir, subfolder);
+
+			if (!storageDir.exists() && !storageDir.mkdirs()) {
+				throw new RuntimeException(
+					"Could not create storage directory for FileChannelManager: " + storageDir.getAbsolutePath());
+			}
+			files[i] = storageDir;
+
+			LOG.info("FileChannelManager uses directory {} for spill files.", storageDir.getAbsolutePath());
+		}
+		return files;
+	}
+
+	@Override
+	public ID createChannel() {
+		int num = getNextPathNum();
+		return new ID(paths[num], num, random);
+	}
+
+	@Override
+	public Enumerator createChannelEnumerator() {
+		return new Enumerator(paths, random);
+	}
+
+	@Override
+	public File[] getPaths() {
+		return Arrays.copyOf(paths, paths.length);
+	}
+
+	/**
+	 * Remove all the temp directories.
+	 */
+	@Override
+	public void close() throws Exception {
+		IOUtils.closeAll(Arrays.stream(paths)
+			.filter(File::exists)
+			.map(FileChannelManagerImpl::getFileCloser)
+			.collect(Collectors.toList()));
+	}
+
+	private static AutoCloseable getFileCloser(File path) {
+		return () -> {
+			try {
+				FileUtils.deleteDirectory(path);
+				LOG.info("FileChannelManager removed spill file directory {}", path.getAbsolutePath());
+			} catch (IOException e) {
+				String errorMessage = String.format("FileChannelManager failed to properly clean up temp file directory: %s", path);
+				throw new IOException(errorMessage, e);
+			}
+		};
+	}
+
+	private int getNextPathNum() {
+		int next = nextPath;
+		int newNext = next + 1;
+		nextPath = newNext >= paths.length ? 0 : newNext;
+		return next;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index a649e42..1be8639 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,24 +19,20 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
 
 /**
  * The facade for the provided I/O manager services.
@@ -44,14 +40,9 @@ import java.util.stream.Collectors;
 public abstract class IOManager implements AutoCloseable {
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
-	/** The temporary directories for files. */
-	private final File[] paths;
+	private static final String DIR_NAME_PREFIX = "io";
 
-	/** A random number generator for the anonymous ChannelIDs. */
-	private final Random random;
-
-	/** The number of the next path to use. */
-	private volatile int nextPath;
+	private final FileChannelManager fileChannelManager;
 
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
@@ -63,26 +54,7 @@ public abstract class IOManager implements AutoCloseable {
 	 * @param tempDirs The basic directories for files underlying anonymous channels.
 	 */
 	protected IOManager(String[] tempDirs) {
-		if (tempDirs == null || tempDirs.length == 0) {
-			throw new IllegalArgumentException("The temporary directories must not be null or empty.");
-		}
-
-		this.random = new Random();
-		this.nextPath = 0;
-
-		this.paths = new File[tempDirs.length];
-		for (int i = 0; i < tempDirs.length; i++) {
-			File baseDir = new File(tempDirs[i]);
-			String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString());
-			File storageDir = new File(baseDir, subfolder);
-
-			if (!storageDir.exists() && !storageDir.mkdirs()) {
-				throw new RuntimeException(
-						"Could not create storage directory for IOManager: " + storageDir.getAbsolutePath());
-			}
-			paths[i] = storageDir;
-			LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath());
-		}
+		this.fileChannelManager = new FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
 	}
 
 	/**
@@ -90,22 +62,7 @@ public abstract class IOManager implements AutoCloseable {
 	 */
 	@Override
 	public void close() throws Exception {
-		IOUtils.closeAll(Arrays.stream(paths)
-			.filter(File::exists)
-			.map(IOManager::getFileCloser)
-			.collect(Collectors.toList()));
-	}
-
-	private static AutoCloseable getFileCloser(File path) {
-		return () -> {
-			try {
-				FileUtils.deleteDirectory(path);
-				LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
-			} catch (IOException e) {
-				String errorMessage = String.format("IOManager failed to properly clean up temp file directory: %s", path);
-				throw new IOException(errorMessage, e);
-			}
-		};
+		fileChannelManager.close();
 	}
 
 	// ------------------------------------------------------------------------
@@ -119,8 +76,7 @@ public abstract class IOManager implements AutoCloseable {
 	 * @return A channel to a temporary directory.
 	 */
 	public ID createChannel() {
-		final int num = getNextPathNum();
-		return new ID(this.paths[num], num, this.random);
+		return fileChannelManager.createChannel();
 	}
 
 	/**
@@ -130,7 +86,7 @@ public abstract class IOManager implements AutoCloseable {
 	 * @return An enumerator for channels.
 	 */
 	public Enumerator createChannelEnumerator() {
-		return new Enumerator(this.paths, this.random);
+		return fileChannelManager.createChannelEnumerator();
 	}
 
 	/**
@@ -147,6 +103,29 @@ public abstract class IOManager implements AutoCloseable {
 		}
 	}
 
+	/**
+	 * Gets the directories that the I/O manager spills to.
+	 *
+	 * @return The directories that the I/O manager spills to.
+	 */
+	public File[] getSpillingDirectories() {
+		return fileChannelManager.getPaths();
+	}
+
+	/**
+	 * Gets the directories that the I/O manager spills to, as path strings.
+	 *
+	 * @return The directories that the I/O manager spills to, as path strings.
+	 */
+	public String[] getSpillingDirectoriesPaths() {
+		File[] paths = fileChannelManager.getPaths();
+		String[] strings = new String[paths.length];
+		for (int i = 0; i < strings.length; i++) {
+			strings[i] = paths[i].getAbsolutePath();
+		}
+		return strings;
+	}
+
 	// ------------------------------------------------------------------------
 	//                        Reader / Writer instantiations
 	// ------------------------------------------------------------------------
@@ -245,38 +224,4 @@ public abstract class IOManager implements AutoCloseable {
 		ID channelID,
 		List<MemorySegment> targetSegments,
 		int numBlocks) throws IOException;
-
-
-	// ------------------------------------------------------------------------
-	//                          Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the directories that the I/O manager spills to.
-	 *
-	 * @return The directories that the I/O manager spills to.
-	 */
-	public File[] getSpillingDirectories() {
-		return this.paths;
-	}
-
-	/**
-	 * Gets the directories that the I/O manager spills to, as path strings.
-	 *
-	 * @return The directories that the I/O manager spills to, as path strings.
-	 */
-	public String[] getSpillingDirectoriesPaths() {
-		String[] strings = new String[this.paths.length];
-		for (int i = 0; i < strings.length; i++) {
-			strings[i] = paths[i].getAbsolutePath();
-		}
-		return strings;
-	}
-
-	private int getNextPathNum() {
-		final int next = this.nextPath;
-		final int newNext = next + 1;
-		this.nextPath = newNext >= this.paths.length ? 0 : newNext;
-		return next;
-	}
 }


[flink] 05/06: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80003d62f386fc95a3dbbc414f2cd4de7a26e1bd
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 27 00:29:16 2019 +0800

    [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
    
    The current creation of NettyShuffleEnvironment relies on IOManager from TaskManagerServices. Actually the shuffle only needs the
    file channel during creating partition, so it could internally create a light-weight FileChannelManager with its own prefix folder
    name instead of the heavy-weight IOManagerAsync.
---
 .../io/network/NettyShuffleEnvironment.java        | 13 ++++++
 .../io/network/NettyShuffleServiceFactory.java     | 17 +++++---
 .../network/partition/ResultPartitionFactory.java  | 16 +++----
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 10 +----
 .../runtime/taskexecutor/TaskManagerServices.java  |  9 ++--
 .../NettyShuffleEnvironmentConfiguration.java      | 23 ++++++++--
 .../runtime/io/disk/NoOpFileChannelManager.java    | 51 ++++++++++++++++++++++
 .../io/network/NettyShuffleEnvironmentBuilder.java | 17 ++++----
 .../io/network/NettyShuffleEnvironmentTest.java    | 23 +++++++++-
 .../partition/BoundedBlockingSubpartitionTest.java | 24 +++++++++-
 .../BoundedBlockingSubpartitionWriteReadTest.java  | 21 ++++++++-
 .../io/network/partition/PartitionTestUtils.java   | 21 +++++++++
 .../network/partition/ResultPartitionBuilder.java  | 12 ++---
 .../partition/ResultPartitionFactoryTest.java      | 22 +++++++++-
 .../io/network/partition/ResultPartitionTest.java  | 27 +++++++++++-
 .../StreamNetworkBenchmarkEnvironment.java         |  7 ---
 16 files changed, 250 insertions(+), 63 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 5171d75..17fb2cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
@@ -85,6 +86,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	private final ResultPartitionManager resultPartitionManager;
 
+	private final FileChannelManager fileChannelManager;
+
 	private final Map<InputGateID, SingleInputGate> inputGatesById;
 
 	private final ResultPartitionFactory resultPartitionFactory;
@@ -99,6 +102,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 			NetworkBufferPool networkBufferPool,
 			ConnectionManager connectionManager,
 			ResultPartitionManager resultPartitionManager,
+			FileChannelManager fileChannelManager,
 			ResultPartitionFactory resultPartitionFactory,
 			SingleInputGateFactory singleInputGateFactory) {
 		this.taskExecutorResourceId = taskExecutorResourceId;
@@ -107,6 +111,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 		this.connectionManager = connectionManager;
 		this.resultPartitionManager = resultPartitionManager;
 		this.inputGatesById = new ConcurrentHashMap<>(10);
+		this.fileChannelManager = fileChannelManager;
 		this.resultPartitionFactory = resultPartitionFactory;
 		this.singleInputGateFactory = singleInputGateFactory;
 		this.isClosed = false;
@@ -326,6 +331,14 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 				LOG.warn("Network buffer pool did not shut down properly.", t);
 			}
 
+			// delete all the temp directories
+			try {
+				fileChannelManager.close();
+			}
+			catch (Throwable t) {
+				LOG.warn("Cannot close the file channel manager properly.", t);
+			}
+
 			isClosed = true;
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 74dbe0f..be31fb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -22,7 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
@@ -45,6 +46,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettyShuffleDescriptor, ResultPartition, SingleInputGate> {
 
+	private static final String DIR_NAME_PREFIX = "netty-shuffle";
+
 	@Override
 	public NettyShuffleMaster createShuffleMaster(Configuration configuration) {
 		return NettyShuffleMaster.INSTANCE;
@@ -62,8 +65,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			networkConfig,
 			shuffleEnvironmentContext.getTaskExecutorResourceId(),
 			shuffleEnvironmentContext.getEventPublisher(),
-			shuffleEnvironmentContext.getParentMetricGroup(),
-			shuffleEnvironmentContext.getIOManager());
+			shuffleEnvironmentContext.getParentMetricGroup());
 	}
 
 	@VisibleForTesting
@@ -71,18 +73,18 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			NettyShuffleEnvironmentConfiguration config,
 			ResourceID taskExecutorResourceId,
 			TaskEventPublisher taskEventPublisher,
-			MetricGroup metricGroup,
-			IOManager ioManager) {
+			MetricGroup metricGroup) {
 		checkNotNull(config);
 		checkNotNull(taskExecutorResourceId);
 		checkNotNull(taskEventPublisher);
 		checkNotNull(metricGroup);
-		checkNotNull(ioManager);
 
 		NettyConfig nettyConfig = config.nettyConfig();
 
 		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
 
+		FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
+
 		ConnectionManager connectionManager = nettyConfig != null ?
 			new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.isCreditBased()) :
 			new LocalConnectionManager();
@@ -96,7 +98,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 
 		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
 			resultPartitionManager,
-			ioManager,
+			fileChannelManager,
 			networkBufferPool,
 			config.networkBuffersPerChannel(),
 			config.floatingNetworkBuffersPerGate());
@@ -115,6 +117,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			networkBufferPool,
 			connectionManager,
 			resultPartitionManager,
+			fileChannelManager,
 			resultPartitionFactory,
 			singleInputGateFactory);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 137ea5f..aaaecf6 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
@@ -52,7 +52,7 @@ public class ResultPartitionFactory {
 	private final ResultPartitionManager partitionManager;
 
 	@Nonnull
-	private final IOManager ioManager;
+	private final FileChannelManager channelManager;
 
 	@Nonnull
 	private final BufferPoolFactory bufferPoolFactory;
@@ -63,13 +63,13 @@ public class ResultPartitionFactory {
 
 	public ResultPartitionFactory(
 		@Nonnull ResultPartitionManager partitionManager,
-		@Nonnull IOManager ioManager,
+		@Nonnull FileChannelManager channelManager,
 		@Nonnull BufferPoolFactory bufferPoolFactory,
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate) {
 
 		this.partitionManager = partitionManager;
-		this.ioManager = ioManager;
+		this.channelManager = channelManager;
 		this.networkBuffersPerChannel = networkBuffersPerChannel;
 		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
 		this.bufferPoolFactory = bufferPoolFactory;
@@ -135,7 +135,7 @@ public class ResultPartitionFactory {
 		// Create the subpartitions.
 		switch (type) {
 			case BLOCKING:
-				initializeBoundedBlockingPartitions(subpartitions, partition, ioManager, networkBufferSize);
+				initializeBoundedBlockingPartitions(subpartitions, partition, networkBufferSize, channelManager);
 				break;
 
 			case PIPELINED:
@@ -154,13 +154,13 @@ public class ResultPartitionFactory {
 	private static void initializeBoundedBlockingPartitions(
 		ResultSubpartition[] subpartitions,
 		ResultPartition parent,
-		IOManager ioManager,
-		int networkBufferSize) {
+		int networkBufferSize,
+		FileChannelManager channelManager) {
 
 		int i = 0;
 		try {
 			for (; i < subpartitions.length; i++) {
-				final File spillFile = ioManager.createChannel().getPathFile();
+				final File spillFile = channelManager.createChannel().getPathFile();
 				subpartitions[i] = BOUNDED_BLOCKING_TYPE.create(i, parent, spillFile, networkBufferSize);
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index bde82eb..7c1abc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
@@ -39,7 +38,6 @@ public class ShuffleEnvironmentContext {
 	private final InetAddress hostAddress;
 	private final TaskEventPublisher eventPublisher;
 	private final MetricGroup parentMetricGroup;
-	private final IOManager ioManager;
 
 	public ShuffleEnvironmentContext(
 			Configuration configuration,
@@ -48,8 +46,7 @@ public class ShuffleEnvironmentContext {
 			boolean localCommunicationOnly,
 			InetAddress hostAddress,
 			TaskEventPublisher eventPublisher,
-			MetricGroup parentMetricGroup,
-			IOManager ioManager) {
+			MetricGroup parentMetricGroup) {
 		this.configuration = checkNotNull(configuration);
 		this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
 		this.maxJvmHeapMemory = maxJvmHeapMemory;
@@ -57,7 +54,6 @@ public class ShuffleEnvironmentContext {
 		this.hostAddress = checkNotNull(hostAddress);
 		this.eventPublisher = checkNotNull(eventPublisher);
 		this.parentMetricGroup = checkNotNull(parentMetricGroup);
-		this.ioManager = checkNotNull(ioManager);
 	}
 
 	public Configuration getConfiguration() {
@@ -87,8 +83,4 @@ public class ShuffleEnvironmentContext {
 	public MetricGroup getParentMetricGroup() {
 		return parentMetricGroup;
 	}
-
-	public IOManager getIOManager() {
-		return ioManager;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 0aafce0..8de72f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -244,8 +244,7 @@ public class TaskManagerServices {
 		final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
 			taskManagerServicesConfiguration,
 			taskEventDispatcher,
-			taskManagerMetricGroup,
-			ioManager);
+			taskManagerMetricGroup);
 		final int dataPort = shuffleEnvironment.start();
 
 		final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -307,8 +306,7 @@ public class TaskManagerServices {
 	private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			TaskEventDispatcher taskEventDispatcher,
-			MetricGroup taskManagerMetricGroup,
-			IOManager ioManager) throws FlinkException {
+			MetricGroup taskManagerMetricGroup) throws FlinkException {
 
 		final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
 			taskManagerServicesConfiguration.getConfiguration(),
@@ -317,8 +315,7 @@ public class TaskManagerServices {
 			taskManagerServicesConfiguration.isLocalCommunicationOnly(),
 			taskManagerServicesConfiguration.getTaskManagerAddress(),
 			taskEventDispatcher,
-			taskManagerMetricGroup,
-			ioManager);
+			taskManagerMetricGroup);
 
 		return ShuffleServiceLoader
 			.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index a6ff17b..daccd3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,7 @@ import javax.annotation.Nullable;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 
 /**
  * Configuration object for the network stack.
@@ -62,6 +65,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
 	private final NettyConfig nettyConfig;
 
+	private final String[] tempDirs;
+
 	public NettyShuffleEnvironmentConfiguration(
 			int numNetworkBuffers,
 			int networkBufferSize,
@@ -71,7 +76,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			int floatingNetworkBuffersPerGate,
 			boolean isCreditBased,
 			boolean isNetworkDetailedMetrics,
-			@Nullable NettyConfig nettyConfig) {
+			@Nullable NettyConfig nettyConfig,
+			String[] tempDirs) {
 
 		this.numNetworkBuffers = numNetworkBuffers;
 		this.networkBufferSize = networkBufferSize;
@@ -82,6 +88,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		this.isCreditBased = isCreditBased;
 		this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
 		this.nettyConfig = nettyConfig;
+		this.tempDirs = Preconditions.checkNotNull(tempDirs);
 	}
 
 	// ------------------------------------------------------------------------
@@ -122,6 +129,10 @@ public class NettyShuffleEnvironmentConfiguration {
 		return isNetworkDetailedMetrics;
 	}
 
+	public String[] getTempDirs() {
+		return tempDirs;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -158,6 +169,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		boolean isNetworkDetailedMetrics = configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
 
+		String[] tempDirs = ConfigurationUtils.parseTempDirectories(configuration);
+
 		return new NettyShuffleEnvironmentConfiguration(
 			numberOfNetworkBuffers,
 			pageSize,
@@ -167,7 +180,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			extraBuffersPerGate,
 			isCreditBased,
 			isNetworkDetailedMetrics,
-			nettyConfig);
+			nettyConfig,
+			tempDirs);
 	}
 
 	/**
@@ -467,6 +481,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		result = 31 * result + floatingNetworkBuffersPerGate;
 		result = 31 * result + (isCreditBased ? 1 : 0);
 		result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
+		result = 31 * result + Arrays.hashCode(tempDirs);
 		return result;
 	}
 
@@ -488,7 +503,8 @@ public class NettyShuffleEnvironmentConfiguration {
 					this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
 					this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
 					this.isCreditBased == that.isCreditBased &&
-					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
+					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
+					Arrays.equals(this.tempDirs, that.tempDirs);
 		}
 	}
 
@@ -503,6 +519,7 @@ public class NettyShuffleEnvironmentConfiguration {
 				", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
 				", isCreditBased=" + isCreditBased +
 				", nettyConfig=" + nettyConfig +
+				", tempDirs=" + Arrays.toString(tempDirs) +
 				'}';
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
new file mode 100644
index 0000000..4fcf3ab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * An {@link FileChannelManager} that cannot do I/O but serves as a mock for tests.
+ */
+public enum NoOpFileChannelManager implements FileChannelManager {
+
+	INSTANCE;
+
+	@Override
+	public ID createChannel() {
+		throw  new UnsupportedOperationException();
+	}
+
+	@Override
+	public Enumerator createChannelEnumerator() {
+		throw  new UnsupportedOperationException();
+	}
+
+	@Override
+	public File[] getPaths() {
+		throw  new UnsupportedOperationException();
+	}
+
+	@Override
+	public void close() {
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index b0ef430..32b3744 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,17 +20,18 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
  */
 public class NettyShuffleEnvironmentBuilder {
 
+	private static final String[] DEFAULT_TEMP_DIRS = new String[] {EnvironmentInformation.getTemporaryFileDirectory()};
+
 	private int numNetworkBuffers = 1024;
 
 	private int networkBufferSize = 32 * 1024;
@@ -55,7 +56,7 @@ public class NettyShuffleEnvironmentBuilder {
 
 	private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
-	private IOManager ioManager = new IOManagerAsync();
+	private String[] tempDirs = DEFAULT_TEMP_DIRS;
 
 	public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
 		this.taskManagerLocation = taskManagerLocation;
@@ -112,8 +113,8 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
-	public NettyShuffleEnvironmentBuilder setIOManager(IOManager ioManager) {
-		this.ioManager = ioManager;
+	public NettyShuffleEnvironmentBuilder setTempDirs(String[] tempDirs) {
+		this.tempDirs = tempDirs;
 		return this;
 	}
 
@@ -128,10 +129,10 @@ public class NettyShuffleEnvironmentBuilder {
 				floatingNetworkBuffersPerGate,
 				isCreditBased,
 				isNetworkDetailedMetrics,
-				nettyConfig),
+				nettyConfig,
+				tempDirs),
 			taskManagerLocation,
 			taskEventDispatcher,
-			metricGroup,
-			ioManager);
+			metricGroup);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index d4f0727..11951f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -28,8 +30,11 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -53,6 +58,10 @@ import static org.powermock.api.mockito.PowerMockito.spy;
 @RunWith(Parameterized.class)
 public class NettyShuffleEnvironmentTest extends TestLogger {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
 	@Parameterized.Parameter
 	public boolean enableCreditBasedFlowControl;
 
@@ -64,6 +73,16 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	/**
 	 * Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
 	 * instances for various types of input and output channels.
@@ -76,7 +95,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		// result partitions
 		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
 		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
 		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
@@ -179,7 +198,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		// result partitions
 		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
 		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
 		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index e76cd1e..9bd0c4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -18,8 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -29,6 +34,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -41,9 +47,23 @@ import static org.junit.Assert.fail;
  */
 public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
 	@ClassRule
 	public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
 
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Test
@@ -74,14 +94,14 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 
 	@Override
 	ResultSubpartition createSubpartition() throws Exception {
-		final ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+		final ResultPartition resultPartition = createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
 		return BoundedBlockingSubpartition.createWithMemoryMappedFile(
 				0, resultPartition, new File(TMP_DIR.newFolder(), "subpartition"));
 	}
 
 	@Override
 	ResultSubpartition createFailingWritesSubpartition() throws Exception {
-		final ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+		final ResultPartition resultPartition = createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
 
 		return new BoundedBlockingSubpartition(
 				0,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
index 359ad8d..67ab6fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -21,9 +21,14 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,6 +53,10 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 public class BoundedBlockingSubpartitionWriteReadTest {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
 	@ClassRule
 	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
@@ -71,6 +80,16 @@ public class BoundedBlockingSubpartitionWriteReadTest {
 	//  tests
 	// ------------------------------------------------------------------------
 
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	@Test
 	public void testWriteAndReadData() throws Exception {
 		final int numLongs = 15_000_000; // roughly 115 MiBytes
@@ -188,7 +207,7 @@ public class BoundedBlockingSubpartitionWriteReadTest {
 	private BoundedBlockingSubpartition createSubpartition() throws IOException {
 		return type.create(
 				0,
-				PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING),
+				PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, fileChannelManager),
 				new File(TMP_FOLDER.newFolder(), "partitiondata"),
 				BUFFER_SIZE);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 51eff94..e559659 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
@@ -50,12 +51,32 @@ public class PartitionTestUtils {
 		return new ResultPartitionBuilder().setResultPartitionType(type).build();
 	}
 
+	public static ResultPartition createPartition(ResultPartitionType type, FileChannelManager channelManager) {
+		return new ResultPartitionBuilder()
+			.setResultPartitionType(type)
+			.setFileChannelManager(channelManager)
+			.build();
+	}
+
+	public static ResultPartition createPartition(
+			NettyShuffleEnvironment environment,
+			ResultPartitionType partitionType,
+			int numChannels) {
+		return new ResultPartitionBuilder()
+			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
+			.setResultPartitionType(partitionType)
+			.setNumberOfSubpartitions(numChannels)
+			.build();
+	}
+
 	public static ResultPartition createPartition(
 			NettyShuffleEnvironment environment,
+			FileChannelManager channelManager,
 			ResultPartitionType partitionType,
 			int numChannels) {
 		return new ResultPartitionBuilder()
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
+			.setFileChannelManager(channelManager)
 			.setResultPartitionType(partitionType)
 			.setNumberOfSubpartitions(numChannels)
 			.build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index bf403ac..3da8eb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -44,7 +44,7 @@ public class ResultPartitionBuilder {
 
 	private ResultPartitionManager partitionManager = new ResultPartitionManager();
 
-	private IOManager ioManager = new IOManagerAsync();
+	private FileChannelManager channelManager = NoOpFileChannelManager.INSTANCE;
 
 	private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1, 1);
 
@@ -82,8 +82,8 @@ public class ResultPartitionBuilder {
 		return this;
 	}
 
-	public ResultPartitionBuilder setIOManager(IOManager ioManager) {
-		this.ioManager = ioManager;
+	public ResultPartitionBuilder setFileChannelManager(FileChannelManager channelManager) {
+		this.channelManager = channelManager;
 		return this;
 	}
 
@@ -122,7 +122,7 @@ public class ResultPartitionBuilder {
 	public ResultPartition build() {
 		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
 			partitionManager,
-			ioManager,
+			channelManager,
 			networkBufferPool,
 			networkBuffersPerChannel,
 			floatingNetworkBuffersPerGate);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 0c6848b..f2e0e58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -18,15 +18,19 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -38,6 +42,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
  */
 public class ResultPartitionFactoryTest extends TestLogger {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	@Test
 	public void testConsumptionOnReleaseEnabled() {
 		final ResultPartition resultPartition = createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
@@ -53,7 +71,7 @@ public class ResultPartitionFactoryTest extends TestLogger {
 	private static ResultPartition createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
 		ResultPartitionFactory factory = new ResultPartitionFactory(
 			new ResultPartitionManager(),
-			new NoOpIOManager(),
+			fileChannelManager,
 			new NetworkBufferPool(1, 64, 1),
 			1,
 			1
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 2072a12..1024780 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -28,8 +30,11 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -55,6 +60,20 @@ import static org.mockito.Mockito.verify;
  */
 public class ResultPartitionTest {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	/**
 	 * Tests the schedule or update consumers message sending behaviour depending on the relevant flags.
 	 */
@@ -107,6 +126,7 @@ public class ResultPartitionTest {
 			.isReleasedOnConsumption(false)
 			.setResultPartitionManager(manager)
 			.setResultPartitionType(ResultPartitionType.BLOCKING)
+			.setFileChannelManager(fileChannelManager)
 			.build();
 
 		manager.registerResultPartition(partition);
@@ -181,7 +201,8 @@ public class ResultPartitionTest {
 		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 		JobID jobId = new JobID();
 		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartition partition = createPartition(partitionType);
+		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
+			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
 		ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
 			new ResultPartitionWriter[] {partition},
@@ -311,9 +332,11 @@ public class ResultPartitionTest {
 			TaskActions taskActions,
 			JobID jobId,
 			ResultPartitionConsumableNotifier notifier) {
+		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
+			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
 		return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
-			new ResultPartitionWriter[] {createPartition(partitionType)},
+			new ResultPartitionWriter[] {partition},
 			taskActions,
 			jobId,
 			notifier)[0];
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index b7d7430..0cdc658 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -24,8 +24,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -81,7 +79,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected NettyShuffleEnvironment senderEnv;
 	protected NettyShuffleEnvironment receiverEnv;
-	protected IOManager ioManager;
 
 	protected int channels;
 	protected boolean broadcastMode = false;
@@ -142,8 +139,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			receiverBufferPoolSize = Math.max(2048, writers * channels * 4);
 		}
 
-		ioManager = new IOManagerAsync();
-
 		senderEnv = createShuffleEnvironment(senderBufferPoolSize, config);
 		this.dataPort = senderEnv.start();
 		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
@@ -168,7 +163,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 	public void tearDown() {
 		suppressExceptions(senderEnv::close);
 		suppressExceptions(receiverEnv::close);
-		suppressExceptions(ioManager::close);
 	}
 
 	public SerializingLongReceiver createReceiver() throws Exception {
@@ -223,7 +217,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
 			.setNumberOfSubpartitions(channels)
 			.setResultPartitionManager(environment.getResultPartitionManager())
-			.setIOManager(ioManager)
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
 			.build();
 


[flink] 03/06: [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 423e8a8afa4fd440ba2abb6c2b535f881ef84374
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Jun 28 12:18:47 2019 +0800

    [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown
    
    IOManager#close would ignore any exceptions internally in order not to interrupt other close operations,
    then IOManager#isProperlyShutDown is used for checking any exceptions during close process. We could use
    IOUtils#closeAll for handling all the close operations and finally throwing the suppressed exceptions to
    get the same effect, then isProperlyShutDown method could be removed completely.
---
 .../flink/runtime/io/disk/iomanager/IOManager.java |  47 ++++-----
 .../runtime/io/disk/iomanager/IOManagerAsync.java  | 105 ++++++++++-----------
 .../flink/runtime/io/disk/ChannelViewsTest.java    |   5 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |   3 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   5 +-
 .../AsynchronousBufferFileWriterTest.java          |   2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   |   2 +-
 .../BufferFileWriterFileSegmentReaderTest.java     |   2 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |   2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java      |   3 +-
 .../runtime/io/disk/iomanager/IOManagerITCase.java |   1 -
 .../runtime/io/disk/iomanager/IOManagerTest.java   |   2 +-
 .../runtime/operators/hash/HashTableITCase.java    |   5 +-
 .../hash/NonReusingHashJoinIteratorITCase.java     |   5 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |   5 +-
 .../hash/ReOpenableHashTableTestBase.java          |   5 +-
 .../hash/ReusingHashJoinIteratorITCase.java        |   5 +-
 .../resettable/SpillingResettableIteratorTest.java |   5 +-
 ...pillingResettableMutableObjectIteratorTest.java |   5 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |   5 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |   5 +-
 .../runtime/operators/sort/ExternalSortITCase.java |   5 +-
 .../sort/ExternalSortLargeRecordsITCase.java       |   5 +-
 .../sort/FixedLengthRecordSorterTest.java          |   2 +-
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |   5 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |   5 +-
 .../testutils/BinaryOperatorTestBase.java          |   3 +-
 .../operators/testutils/DriverTestBase.java        |   1 -
 .../operators/testutils/MockEnvironment.java       |   4 +-
 .../runtime/operators/testutils/TaskTestBase.java  |   2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |   1 -
 .../operators/util/HashVsSortMiniBenchmark.java    |   5 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |   1 -
 .../streaming/runtime/io/BufferSpillerTest.java    |   2 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |   2 +-
 .../io/SpillingCheckpointBarrierAlignerTest.java   |   2 +-
 .../streaming/runtime/tasks/OperatorChainTest.java |   2 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |   1 -
 .../util/AbstractStreamOperatorTestHarness.java    |   4 +-
 .../flink/table/runtime/aggregate/HashAggTest.java |   5 +-
 .../runtime/hashtable/BinaryHashTableTest.java     |   5 +-
 .../io/CompressedHeaderlessChannelTest.java        |   6 +-
 .../join/Int2SortMergeJoinOperatorTest.java        |   5 +-
 .../runtime/sort/BinaryExternalSorterTest.java     |   5 +-
 .../runtime/sort/BufferedKVExternalSorterTest.java |   5 +-
 45 files changed, 111 insertions(+), 196 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index ee54b1e..a649e42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -24,15 +24,19 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.FileUtils;
 
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 /**
  * The facade for the provided I/O manager services.
@@ -82,39 +86,26 @@ public abstract class IOManager implements AutoCloseable {
 	}
 
 	/**
-	 * Close method, marks the I/O manager as closed
-	 * and removed all temporary files.
+	 * Removes all temporary files.
 	 */
 	@Override
-	public void close() {
-		// remove all of our temp directories
-		for (File path : paths) {
-			try {
-				if (path != null) {
-					if (path.exists()) {
-						FileUtils.deleteDirectory(path);
-						LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
-					}
-				}
-			} catch (Throwable t) {
-				LOG.error("IOManager failed to properly clean up temp file directory: " + path, t);
-			}
-		}
+	public void close() throws Exception {
+		IOUtils.closeAll(Arrays.stream(paths)
+			.filter(File::exists)
+			.map(IOManager::getFileCloser)
+			.collect(Collectors.toList()));
 	}
 
-	/**
-	 * Utility method to check whether the IO manager has been properly shut down.
-	 * For this base implementation, this means that all files have been removed.
-	 *
-	 * @return True, if the IO manager has properly shut down, false otherwise.
-	 */
-	public boolean isProperlyShutDown() {
-		for (File path : paths) {
-			if (path != null && path.exists()) {
-				return false;
+	private static AutoCloseable getFileCloser(File path) {
+		return () -> {
+			try {
+				FileUtils.deleteDirectory(path);
+				LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
+			} catch (IOException e) {
+				String errorMessage = String.format("IOManager failed to properly clean up temp file directory: %s", path);
+				throw new IOException(errorMessage, e);
 			}
-		}
-		return true;
+		};
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index ffa4dcf..2133430 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -21,10 +21,12 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -109,7 +111,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	 * operation.
 	 */
 	@Override
-	public void close() {
+	public void close() throws Exception {
 		// mark shut down and exit if it already was shut down
 		if (!isShutdown.compareAndSet(false, true)) {
 			return;
@@ -118,30 +120,25 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		// Remove shutdown hook to prevent resource leaks
 		ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 
-		try {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Shutting down I/O manager.");
-			}
 
-			// close writing and reading threads with best effort and log problems
-			// first notify all to close, then wait until all are closed
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Shutting down I/O manager.");
+		}
 
-			for (WriterThread wt : writers) {
-				try {
-					wt.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager writer thread.", t);
-				}
-			}
-			for (ReaderThread rt : readers) {
-				try {
-					rt.shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager reader thread.", t);
-				}
-			}
+		// close writing and reading threads with best effort and log problems
+		// first notify all to close, then wait until all are closed
+
+		List<AutoCloseable> closeables = new ArrayList<>(writers.length + readers.length + 2);
+
+		for (WriterThread wt : writers) {
+			closeables.add(getWriterThreadCloser(wt));
+		}
+
+		for (ReaderThread rt : readers) {
+			closeables.add(getReaderThreadCloser(rt));
+		}
+
+		closeables.add(() -> {
 			try {
 				for (WriterThread wt : writers) {
 					wt.join();
@@ -149,44 +146,46 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 				for (ReaderThread rt : readers) {
 					rt.join();
 				}
+			} catch (InterruptedException ie) {
+				Thread.currentThread().interrupt();
 			}
-			catch (InterruptedException iex) {
-				// ignore this on shutdown
-			}
-		}
-		finally {
-			// make sure we call the super implementation in any case and at the last point,
-			// because this will clean up the I/O directories
-			super.close();
-		}
+		});
+
+		// make sure we call the super implementation in any case and at the last point,
+		// because this will clean up the I/O directories
+		closeables.add(super::close);
+
+		IOUtils.closeAll(closeables);
 	}
-	
-	/**
-	 * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
-	 * to be properly shut down when it is closed and its threads have ceased operation.
-	 * 
-	 * @return True, if the IO manager has properly shut down, false otherwise.
-	 */
-	@Override
-	public boolean isProperlyShutDown() {
-		boolean readersShutDown = true;
-		for (ReaderThread rt : readers) {
-			readersShutDown &= rt.getState() == Thread.State.TERMINATED;
-		}
-		
-		boolean writersShutDown = true;
-		for (WriterThread wt : writers) {
-			writersShutDown &= wt.getState() == Thread.State.TERMINATED;
-		}
-		
-		return isShutdown.get() && readersShutDown && writersShutDown && super.isProperlyShutDown();
+
+	private static AutoCloseable getWriterThreadCloser(WriterThread thread) {
+		return () -> {
+			try {
+				thread.shutdown();
+			} catch (Throwable t) {
+				throw new IOException("Error while shutting down IO Manager writer thread.", t);
+			}
+		};
 	}
 
+	private static AutoCloseable getReaderThreadCloser(ReaderThread thread) {
+		return () -> {
+			try {
+				thread.shutdown();
+			} catch (Throwable t) {
+				throw new IOException("Error while shutting down IO Manager reader thread.", t);
+			}
+		};
+	}
 
 	@Override
 	public void uncaughtException(Thread t, Throwable e) {
 		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e);
-		close();
+		try {
+			close();
+		} catch (Exception ex) {
+			LOG.warn("IOManagerAsync did not shut down properly.", ex);
+		}
 	}
 	
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index a617109..4e94de2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -82,11 +82,8 @@ public class ChannelViewsTest
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 		
 		if (memoryManager != null) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 44fe849..c83e942 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -73,9 +73,8 @@ public class FileChannelStreamsITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		ioManager.close();
-		assertTrue("I/O Manager was not properly shut down.", ioManager.isProperlyShutDown());
 		assertTrue("The memory has not been properly released", memManager.verifyEmpty());
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index f94735c..d7d2f28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -71,11 +71,8 @@ public class SpillingBufferTest {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		ioManager.close();
-		if (!ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 		
 		if (memoryManager != null) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 5ef3983..a8e8d81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -58,7 +58,7 @@ public class AsynchronousBufferFileWriterTest {
 	private AsynchronousBufferFileWriter writer;
 
 	@AfterClass
-	public static void shutdown() {
+	public static void shutdown() throws Exception {
 		ioManager.close();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index ead6490..5bf4db4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -316,7 +316,7 @@ public class AsynchronousFileIOChannelTest {
 	}
 
 	@Test
-	public void testExceptionForwardsToClose() {
+	public void testExceptionForwardsToClose() throws Exception {
 		try (IOManagerAsync ioMan = new IOManagerAsync()) {
 			testExceptionForwardsToClose(ioMan, 100, 1);
 			testExceptionForwardsToClose(ioMan, 100, 50);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index d2c388c..88f241e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -60,7 +60,7 @@ public class BufferFileWriterFileSegmentReaderTest {
 	private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue<>();
 
 	@AfterClass
-	public static void shutdown() {
+	public static void shutdown() throws Exception {
 		ioManager.close();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 78026bf..0eab18d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -57,7 +57,7 @@ public class BufferFileWriterReaderTest {
 	private LinkedBlockingQueue<Buffer> returnedBuffers = new LinkedBlockingQueue<>();
 
 	@AfterClass
-	public static void shutdown() {
+	public static void shutdown() throws Exception {
 		ioManager.close();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
index a4d3795..b0ce877 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -46,9 +46,8 @@ public class IOManagerAsyncTest {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 4a3f13f..83e15be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -66,7 +66,6 @@ public class IOManagerITCase extends TestLogger {
 	@After
 	public void afterTest() throws Exception {
 		ioManager.close();
-		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
 		
 		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memoryManager.verifyEmpty());
 		memoryManager.shutdown();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index ce16bba..4c0e6a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -40,7 +40,7 @@ public class IOManagerTest {
 	public final TemporaryFolder  temporaryFolder = new TemporaryFolder();
 
 	@Test
-	public void channelEnumerator() throws IOException {
+	public void channelEnumerator() throws Exception {
 		File tempPath = temporaryFolder.newFolder();
 
 		String[] tempDirs = new String[]{
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index cde6d89..1b86cec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -100,13 +100,10 @@ public class HashTableITCase extends TestLogger {
 	}
 	
 	@After
-	public void tearDown()
+	public void tearDown() throws Exception
 	{
 		// shut down I/O manager and Memory Manager and verify the correct shutdown
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			fail("I/O manager was not property shut down.");
-		}
 		if (!this.memManager.verifyEmpty()) {
 			fail("Not all memory was properly released to the memory manager --> Memory Leak.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
index 4512ee8..553fd24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
@@ -103,12 +103,9 @@ public class NonReusingHashJoinIteratorITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 		
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
index 754da40..95f36e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -81,12 +81,9 @@ public class ReOpenableHashTableITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
index e6ac58a..9acd87a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
@@ -92,12 +92,9 @@ public abstract class ReOpenableHashTableTestBase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
index c199e38..9a69d89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
@@ -111,12 +111,9 @@ public class ReusingHashJoinIteratorITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 		
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index 3f5a25d..15c32fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -71,11 +71,8 @@ public class SpillingResettableIteratorTest {
 	}
 
 	@After
-	public void shutdown() {
+	public void shutdown() throws Exception {
 		this.ioman.close();
-		if (!this.ioman.isProperlyShutDown()) {
-			Assert.fail("I/O Manager Shutdown was not completed properly.");
-		}
 		this.ioman = null;
 
 		if (!this.memman.verifyEmpty()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index 2a69a63..c442e7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -68,11 +68,8 @@ public class SpillingResettableMutableObjectIteratorTest {
 	}
 
 	@After
-	public void shutdown() {
+	public void shutdown() throws Exception {
 		this.ioman.close();
-		if (!this.ioman.isProperlyShutDown()) {
-			Assert.fail("I/O Manager Shutdown was not completed properly.");
-		}
 		this.ioman = null;
 
 		if (!this.memman.verifyEmpty()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 7cdf07f..364dc32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -109,12 +109,9 @@ public abstract class AbstractSortMergeOuterJoinIteratorITCase extends TestLogge
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 96831f1..e50798d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -92,11 +92,8 @@ public class CombiningUnilateralSortMergerITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 		
 		if (this.memoryManager != null) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index d1aa0a7..691a065 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -85,11 +85,8 @@ public class ExternalSortITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 		
 		if (this.memoryManager != null && testSuccess) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 335dbee..48d6ce7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -71,11 +71,8 @@ public class ExternalSortLargeRecordsITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 		
 		if (this.memoryManager != null && testSuccess) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index 2f05c78..fc33ffa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -73,7 +73,7 @@ public class FixedLengthRecordSorterTest {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (!this.memoryManager.verifyEmpty()) {
 			Assert.fail("Memory Leak: Some memory has not been returned to the memory manager.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index c729a3a..cc2736d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -110,12 +110,9 @@ public class NonReusingSortMergeInnerJoinIteratorITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 		
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index d149084..e6e3828 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -110,12 +110,9 @@ public class ReusingSortMergeInnerJoinIteratorITCase extends TestLogger {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index d2a4f6a..1ffac3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -389,8 +389,7 @@ public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT> extend
 		
 		// 2nd, shutdown I/O
 		this.ioManager.close();
-		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
-		
+
 		// last, verify all memory is returned and shutdown mem manager
 		MemoryManager memMan = getMemoryManager();
 		if (memMan != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index e1d2cb9..ec106ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -386,7 +386,6 @@ public abstract class DriverTestBase<S extends Function> extends TestLogger impl
 		
 		// 2nd, shutdown I/O
 		this.ioManager.close();
-		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
 
 		// last, verify all memory is returned and shutdown mem manager
 		MemoryManager memMan = getMemoryManager();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2c96ca4..d3d462f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -336,7 +336,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 	}
 
 	@Override
-	public void close() {
+	public void close() throws Exception {
 		// close() method should be idempotent and calling memManager.verifyEmpty() will throw after it was shutdown.
 		if (!memManager.isShutdown()) {
 			checkState(memManager.verifyEmpty(), "Memory Manager managed memory was not completely freed.");
@@ -344,8 +344,6 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 		memManager.shutdown();
 		ioManager.close();
-
-		checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down.");
 	}
 
 	public void setExpectedExternalFailureCause(Class<Throwable> expectedThrowableClass) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 1a46539..16839aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -156,7 +156,7 @@ public abstract class TaskTestBase extends TestLogger {
 	}
 
 	@After
-	public void shutdown() {
+	public void shutdown() throws Exception {
 		mockEnv.close();
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 1ff50b0..73ea30c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -379,7 +379,6 @@ public abstract class UnaryOperatorTestBase<S extends Function, IN, OUT> extends
 		
 		// 2nd, shutdown I/O
 		this.ioManager.close();
-		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
 
 		// last, verify all memory is returned and shutdown mem manager
 		MemoryManager memMan = getMemoryManager();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 7b4eaba..a9581f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -97,7 +97,7 @@ public class HashVsSortMiniBenchmark {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		if (this.memoryManager != null) {
 			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
 				this.memoryManager.verifyEmpty());
@@ -107,9 +107,6 @@ public class HashVsSortMiniBenchmark {
 		
 		if (this.ioManager != null) {
 			this.ioManager.close();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
 			this.ioManager = null;
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e67e979..c7b8e12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -306,7 +306,6 @@ public class TaskExecutorTest extends TestLogger {
 
 		assertThat(memoryManager.isShutdown(), is(true));
 		assertThat(nettyShuffleEnvironment.isClosed(), is(true));
-		assertThat(ioManager.isProperlyShutDown(), is(true));
 		assertThat(kvStateService.isShutdown(), is(true));
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index d19b99c..ead6bd8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -54,7 +54,7 @@ public class BufferSpillerTest extends BufferStorageTestBase {
 	}
 
 	@AfterClass
-	public static void shutdownIOManager() {
+	public static void shutdownIOManager() throws Exception {
 		ioManager.close();
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 535dfc0..341e291 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -77,7 +77,7 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
 	}
 
 	@AfterClass
-	public static void shutdownIOManager() {
+	public static void shutdownIOManager() throws Exception {
 		ioManager.close();
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
index 0d28325..c95e1f2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
@@ -48,7 +48,7 @@ public class SpillingCheckpointBarrierAlignerTest extends CheckpointBarrierAlign
 	}
 
 	@AfterClass
-	public static void shutdownIOManager() {
+	public static void shutdownIOManager() throws Exception {
 		ioManager.close();
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
index 8b1b183..e7bf45a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
@@ -72,7 +72,7 @@ public class OperatorChainTest {
 
 	@SafeVarargs
 	private static <T, OP extends StreamOperator<T>> OperatorChain<T, OP> setupOperatorChain(
-			OneInputStreamOperator<T, T>... operators) {
+			OneInputStreamOperator<T, T>... operators) throws Exception {
 
 		checkNotNull(operators);
 		checkArgument(operators.length > 0);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index c059757..2049da1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -326,7 +326,6 @@ public class StreamTaskTestHarness<OUT> {
 
 	private void shutdownIOManager() throws Exception {
 		this.mockEnv.getIOManager().close();
-		Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
 	}
 
 	private void shutdownMemoryManager() throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 3ab5390..ce561d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -523,7 +523,9 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		}
 		setupCalled = false;
 
-		internalEnvironment.ifPresent(MockEnvironment::close);
+		if (internalEnvironment.isPresent()) {
+			internalEnvironment.get().close();
+		}
 	}
 
 	public void setProcessingTime(long time) throws Exception {
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
index f35a40c..5a74016 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
@@ -94,11 +94,8 @@ public class HashAggTest {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 
 		if (this.memoryManager != null) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.",
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
index de4e949..cf509ed 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
@@ -93,12 +93,9 @@ public class BinaryHashTableTest {
 	}
 
 	@After
-	public void tearDown() {
+	public void tearDown() throws Exception {
 		// shut down I/O manager and Memory Manager and verify the correct shutdown
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			fail("I/O manager was not property shut down.");
-		}
 	}
 
 	@Test
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
index 14039c8..6f886d6 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
 import org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -50,11 +49,8 @@ public class CompressedHeaderlessChannelTest {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 	}
 
 	@Test
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
index 7b47903..c8c9bdb 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
@@ -76,12 +76,9 @@ public class Int2SortMergeJoinOperatorTest {
 	}
 
 	@After
-	public void tearDown() {
+	public void tearDown() throws Exception {
 		// shut down I/O manager and Memory Manager and verify the correct shutdown
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			fail("I/O manager was not property shut down.");
-		}
 		if (!this.memManager.verifyEmpty()) {
 			fail("Not all memory was properly released to the memory manager --> Memory Leak.");
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
index 8eb62d0..7bbf45c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
@@ -100,11 +100,8 @@ public class BinaryExternalSorterTest {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 
 		if (this.memoryManager != null) {
 			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.",
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
index e095df1..069d40e 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
@@ -102,11 +102,8 @@ public class BufferedKVExternalSorterTest {
 	}
 
 	@After
-	public void afterTest() {
+	public void afterTest() throws Exception {
 		this.ioManager.close();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
 	}
 
 	@Test


[flink] 01/06: [hotfix][runtime] Cleanup IOManager code

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdbfb82ef3eaa242abf6d070463c0895ac244ef1
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Jun 19 12:36:54 2019 +0800

    [hotfix][runtime] Cleanup IOManager code
---
 .../runtime/io/disk/iomanager/FileIOChannel.java   | 78 ++++++++++----------
 .../flink/runtime/io/disk/iomanager/IOManager.java | 83 +++++++++++-----------
 .../IOManagerAsyncWithNoOpBufferFileWriter.java    | 53 --------------
 .../operators/sort/LargeRecordHandlerITCase.java   | 26 ++++---
 4 files changed, 91 insertions(+), 149 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index fd8e8e6..ef57e03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -18,93 +18,90 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.util.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.util.StringUtils;
-
 /**
  * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
  * files that contain sorted runs of data from the same stream, that will later on be merged together.
  */
 public interface FileIOChannel {
-	
+
 	/**
 	 * Gets the channel ID of this I/O channel.
-	 * 
+	 *
 	 * @return The channel ID.
 	 */
-	FileIOChannel.ID getChannelID();
-	
+	ID getChannelID();
+
 	/**
 	 * Gets the size (in bytes) of the file underlying the channel.
-	 * 
-	 * @return The size (in bytes) of the file underlying the channel.
 	 */
 	long getSize() throws IOException;
-	
+
 	/**
 	 * Checks whether the channel has been closed.
-	 * 
+	 *
 	 * @return True if the channel has been closed, false otherwise.
 	 */
 	boolean isClosed();
 
 	/**
-	* Closes the channel. For asynchronous implementations, this method waits until all pending requests are
-	* handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
-	* 
-	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
-	*/
+	 * Closes the channel. For asynchronous implementations, this method waits until all pending requests are
+	 * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+	 *
+	 * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	 */
 	void close() throws IOException;
 
 	/**
 	 * Deletes the file underlying this I/O channel.
-	 *  
+	 *
 	 * @throws IllegalStateException Thrown, when the channel is still open.
 	 */
 	void deleteChannel();
-	
-	/**
-	* Closes the channel and deletes the underlying file.
-	* For asynchronous implementations, this method waits until all pending requests are handled;
-	* 
-	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
-	*/
-	public void closeAndDelete() throws IOException;
 
 	FileChannel getNioFileChannel();
-	
+
+	/**
+	 * Closes the channel and deletes the underlying file. For asynchronous implementations,
+	 * this method waits until all pending requests are handled.
+	 *
+	 * @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	 */
+	void closeAndDelete() throws IOException;
+
 	// --------------------------------------------------------------------------------------------
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * An ID identifying an underlying file channel.
 	 */
-	public static class ID {
-		
+	class ID {
+
 		private static final int RANDOM_BYTES_LENGTH = 16;
-		
+
 		private final File path;
-		
+
 		private final int threadNum;
 
-		protected ID(File path, int threadNum) {
+		private ID(File path, int threadNum) {
 			this.path = path;
 			this.threadNum = threadNum;
 		}
 
-		protected ID(File basePath, int threadNum, Random random) {
+		public ID(File basePath, int threadNum, Random random) {
 			this.path = new File(basePath, randomString(random) + ".channel");
 			this.threadNum = threadNum;
 		}
 
 		/**
 		 * Returns the path to the underlying temporary file.
-		 * @return The path to the underlying temporary file..
 		 */
 		public String getPath() {
 			return path.getAbsolutePath();
@@ -112,12 +109,11 @@ public interface FileIOChannel {
 
 		/**
 		 * Returns the path to the underlying temporary file as a File.
-		 * @return The path to the underlying temporary file as a File.
 		 */
 		public File getPathFile() {
 			return path;
 		}
-		
+
 		int getThreadNum() {
 			return this.threadNum;
 		}
@@ -131,17 +127,17 @@ public interface FileIOChannel {
 				return false;
 			}
 		}
-		
+
 		@Override
 		public int hashCode() {
 			return path.hashCode();
 		}
-		
+
 		@Override
 		public String toString() {
 			return path.getAbsolutePath();
 		}
-		
+
 		private static String randomString(Random random) {
 			byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
 			random.nextBytes(bytes);
@@ -152,7 +148,7 @@ public interface FileIOChannel {
 	/**
 	 * An enumerator for channels that logically belong together.
 	 */
-	public static final class Enumerator {
+	final class Enumerator {
 
 		private static AtomicInteger globalCounter = new AtomicInteger();
 
@@ -162,7 +158,7 @@ public interface FileIOChannel {
 
 		private int localCounter;
 
-		protected Enumerator(File[] basePaths, Random random) {
+		public Enumerator(File[] basePaths, Random random) {
 			this.paths = basePaths;
 			this.namePrefix = ID.randomString(random);
 			this.localCounter = 0;
@@ -177,4 +173,4 @@ public interface FileIOChannel {
 			return new ID(new File(paths[threadNum], filename), threadNum);
 		}
 	}
-}
\ No newline at end of file
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 0aaadf0..6723597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.FileUtils;
 
@@ -36,10 +38,9 @@ import java.util.concurrent.LinkedBlockingQueue;
  * The facade for the provided I/O manager services.
  */
 public abstract class IOManager {
-	/** Logging */
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
-	/** The temporary directories for files */
+	/** The temporary directories for files. */
 	private final File[] paths;
 
 	/** A random number generator for the anonymous ChannelIDs. */
@@ -120,41 +121,40 @@ public abstract class IOManager {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
-	 * invocations of this method spread the channels evenly across the different directories.
+	 * Creates a new {@link ID} in one of the temp directories. Multiple invocations of this
+	 * method spread the channels evenly across the different directories.
 	 *
 	 * @return A channel to a temporary directory.
 	 */
-	public FileIOChannel.ID createChannel() {
+	public ID createChannel() {
 		final int num = getNextPathNum();
-		return new FileIOChannel.ID(this.paths[num], num, this.random);
+		return new ID(this.paths[num], num, this.random);
 	}
 
 	/**
-	 * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion
+	 * Creates a new {@link Enumerator}, spreading the channels in a round-robin fashion
 	 * across the temporary file directories.
 	 *
 	 * @return An enumerator for channels.
 	 */
-	public FileIOChannel.Enumerator createChannelEnumerator() {
-		return new FileIOChannel.Enumerator(this.paths, this.random);
+	public Enumerator createChannelEnumerator() {
+		return new Enumerator(this.paths, this.random);
 	}
 
 	/**
 	 * Deletes the file underlying the given channel. If the channel is still open, this
 	 * call may fail.
-	 * 
+	 *
 	 * @param channel The channel to be deleted.
-	 * @throws IOException Thrown if the deletion fails.
 	 */
-	public void deleteChannel(FileIOChannel.ID channel) throws IOException {
+	public static void deleteChannel(ID channel) {
 		if (channel != null) {
 			if (channel.getPathFile().exists() && !channel.getPathFile().delete()) {
 				LOG.warn("IOManager failed to delete temporary file {}", channel.getPath());
 			}
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//                        Reader / Writer instantiations
 	// ------------------------------------------------------------------------
@@ -167,8 +167,8 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
-		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
+	public BlockChannelWriter<MemorySegment> createBlockChannelWriter(ID channelID) throws IOException {
+		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<>());
 	}
 
 	/**
@@ -180,8 +180,9 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID,
-				LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+	public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(
+		ID channelID,
+		LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer calls the given callback
@@ -193,7 +194,9 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
+	public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(
+		ID channelID,
+		RequestDoneCallback<MemorySegment> callback) throws IOException;
 
 	/**
 	 * Creates a block channel reader that reads blocks from the given channel. The reader pushed
@@ -204,8 +207,8 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
-		return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
+	public BlockChannelReader<MemorySegment> createBlockChannelReader(ID channelID) throws IOException {
+		return createBlockChannelReader(channelID, new LinkedBlockingQueue<>());
 	}
 
 	/**
@@ -217,22 +220,27 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID,
-										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+	public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(
+		ID channelID,
+		LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 
-	public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
+	public abstract BufferFileWriter createBufferFileWriter(ID channelID) throws IOException;
 
-	public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
+	public abstract BufferFileReader createBufferFileReader(
+		ID channelID,
+		RequestDoneCallback<Buffer> callback) throws IOException;
 
-	public abstract BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException;
+	public abstract BufferFileSegmentReader createBufferFileSegmentReader(
+		ID channelID,
+		RequestDoneCallback<FileSegment> callback) throws IOException;
 
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
 	 * The reader draws segments to read the blocks into from a supplied list, which must contain as many
 	 * segments as the channel has blocks. After the reader is done, the list with the full segments can be
 	 * obtained from the reader.
-	 * <p>
-	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a
+	 *
+	 * <p>If a channel is not to be read in one bulk, but in multiple smaller batches, a
 	 * {@link BlockChannelReader} should be used.
 	 *
 	 * @param channelID The descriptor for the channel to write to.
@@ -241,26 +249,19 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
-			List<MemorySegment> targetSegments, int numBlocks) throws IOException;
+	public abstract BulkBlockChannelReader createBulkBlockChannelReader(
+		ID channelID,
+		List<MemorySegment> targetSegments,
+		int numBlocks) throws IOException;
 
 
 	// ------------------------------------------------------------------------
 	//                          Utilities
 	// ------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of directories across which the I/O manager rotates its files.
-	 * 
-	 * @return The number of temporary file directories.
-	 */
-	public int getNumberOfSpillingDirectories() {
-		return this.paths.length;
-	}
 
 	/**
 	 * Gets the directories that the I/O manager spills to.
-	 * 
+	 *
 	 * @return The directories that the I/O manager spills to.
 	 */
 	public File[] getSpillingDirectories() {
@@ -279,8 +280,8 @@ public abstract class IOManager {
 		}
 		return strings;
 	}
-	
-	protected int getNextPathNum() {
+
+	private int getNextPathNum() {
 		final int next = this.nextPath;
 		final int newNext = next + 1;
 		this.nextPath = newNext >= this.paths.length ? 0 : newNext;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
deleted file mode 100644
index 363e02b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-
-import java.io.IOException;
-
-/**
- * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method.
- *
- * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually!
- */
-public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync {
-	@Override
-	public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
-			throws IOException {
-		return new NoOpAsynchronousBufferFileWriter(channelID, getWriteRequestQueue(channelID));
-	}
-
-	/**
-	 * {@link BufferFileWriter} subclass with a no-op in {@link #writeBlock(Buffer)}.
-	 */
-	private static class NoOpAsynchronousBufferFileWriter extends AsynchronousBufferFileWriter {
-
-		private NoOpAsynchronousBufferFileWriter(
-				ID channelID,
-				RequestQueue<WriteRequest> requestQueue) throws IOException {
-			super(channelID, requestQueue);
-		}
-
-		@Override
-		public void writeBlock(Buffer buffer) throws IOException {
-			// do nothing
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 60d17bf..8f9e4dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,10 +40,20 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.MutableObjectIterator;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class LargeRecordHandlerITCase extends TestLogger {
 
 	@Test
@@ -262,9 +262,7 @@ public class LargeRecordHandlerITCase extends TestLogger {
 		}
 		finally {
 			if (channel != null) {
-				try {
-					ioMan.deleteChannel(channel);
-				} catch (IOException ignored) {}
+				ioMan.deleteChannel(channel);
 			}
 
 			ioMan.shutdown();


[flink] 02/06: [hotfix][runtime] IOManager implements AutoCloseable

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5809ed32f869fa880a70a774d5b8365fe59dba4a
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Tue Jun 18 12:09:04 2019 +0800

    [hotfix][runtime] IOManager implements AutoCloseable
---
 .../flink/runtime/io/disk/iomanager/IOManager.java |  5 ++--
 .../runtime/io/disk/iomanager/IOManagerAsync.java  |  8 +++---
 .../runtime/taskexecutor/TaskManagerServices.java  |  2 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java    |  2 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |  2 +-
 .../runtime/io/disk/FileChannelStreamsTest.java    | 12 ++------
 .../io/disk/SeekableFileChannelInputViewTest.java  |  6 +---
 .../flink/runtime/io/disk/SpillingBufferTest.java  |  2 +-
 .../AsynchronousBufferFileWriterTest.java          |  2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   | 19 +++----------
 .../BufferFileWriterFileSegmentReaderTest.java     |  2 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |  2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java      |  4 +--
 .../runtime/io/disk/iomanager/IOManagerITCase.java |  2 +-
 .../runtime/io/disk/iomanager/IOManagerTest.java   | 33 ++++++++--------------
 .../runtime/operators/hash/HashTableITCase.java    |  2 +-
 .../hash/HashTablePerformanceComparison.java       |  5 +---
 .../runtime/operators/hash/HashTableTest.java      | 21 ++------------
 .../hash/NonReusingHashJoinIteratorITCase.java     |  2 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |  2 +-
 .../hash/ReOpenableHashTableTestBase.java          |  2 +-
 .../hash/ReusingHashJoinIteratorITCase.java        |  2 +-
 .../resettable/SpillingResettableIteratorTest.java |  2 +-
 ...pillingResettableMutableObjectIteratorTest.java |  2 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |  2 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |  2 +-
 .../runtime/operators/sort/ExternalSortITCase.java |  2 +-
 .../sort/ExternalSortLargeRecordsITCase.java       |  2 +-
 .../sort/FixedLengthRecordSorterTest.java          |  2 +-
 .../operators/sort/LargeRecordHandlerITCase.java   | 18 ++----------
 .../operators/sort/LargeRecordHandlerTest.java     | 21 ++------------
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |  2 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |  2 +-
 .../operators/sort/UnilateralSortMergerTest.java   |  4 +--
 .../testutils/BinaryOperatorTestBase.java          |  2 +-
 .../operators/testutils/DriverTestBase.java        |  2 +-
 .../operators/testutils/MockEnvironment.java       |  2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |  2 +-
 .../operators/util/HashVsSortMiniBenchmark.java    |  2 +-
 .../streaming/runtime/io/BufferSpillerTest.java    |  2 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |  2 +-
 .../CheckpointBarrierAlignerMassiveRandomTest.java |  7 +----
 .../io/SpillingCheckpointBarrierAlignerTest.java   |  2 +-
 .../StreamNetworkBenchmarkEnvironment.java         |  2 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |  2 +-
 .../flink/table/runtime/aggregate/HashAggTest.java |  2 +-
 .../runtime/hashtable/BinaryHashTableTest.java     |  2 +-
 .../io/CompressedHeaderlessChannelTest.java        |  2 +-
 .../join/Int2SortMergeJoinOperatorTest.java        |  2 +-
 .../runtime/sort/BinaryExternalSorterTest.java     |  2 +-
 .../runtime/sort/BufferedKVExternalSorterTest.java |  2 +-
 .../manual/HashTableRecordWidthCombinations.java   |  7 +----
 .../flink/test/manual/MassiveStringSorting.java    | 14 ++-------
 .../test/manual/MassiveStringValueSorting.java     | 14 ++-------
 54 files changed, 82 insertions(+), 192 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 6723597..ee54b1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * The facade for the provided I/O manager services.
  */
-public abstract class IOManager {
+public abstract class IOManager implements AutoCloseable {
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
 	/** The temporary directories for files. */
@@ -85,7 +85,8 @@ public abstract class IOManager {
 	 * Close method, marks the I/O manager as closed
 	 * and removed all temporary files.
 	 */
-	public void shutdown() {
+	@Override
+	public void close() {
 		// remove all of our temp directories
 		for (File path : paths) {
 			try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 0c3c8f1..ffa4dcf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -100,7 +100,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		}
 
 		// install a shutdown hook that makes sure the temp directories get deleted
-		this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);
+		this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::close, getClass().getSimpleName(), LOG);
 	}
 
 	/**
@@ -109,7 +109,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	 * operation.
 	 */
 	@Override
-	public void shutdown() {
+	public void close() {
 		// mark shut down and exit if it already was shut down
 		if (!isShutdown.compareAndSet(false, true)) {
 			return;
@@ -157,7 +157,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		finally {
 			// make sure we call the super implementation in any case and at the last point,
 			// because this will clean up the I/O directories
-			super.shutdown();
+			super.close();
 		}
 	}
 	
@@ -186,7 +186,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	@Override
 	public void uncaughtException(Thread t, Throwable e) {
 		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e);
-		shutdown();
+		close();
 	}
 	
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 35236d9..0aafce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -179,7 +179,7 @@ public class TaskManagerServices {
 		}
 
 		try {
-			ioManager.shutdown();
+			ioManager.close();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 8c7ca1b..a617109 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -83,7 +83,7 @@ public class ChannelViewsTest
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 7ffb58a..44fe849 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -74,7 +74,7 @@ public class FileChannelStreamsITCase extends TestLogger {
 
 	@After
 	public void afterTest() {
-		ioManager.shutdown();
+		ioManager.close();
 		assertTrue("I/O Manager was not properly shut down.", ioManager.isProperlyShutDown());
 		assertTrue("The memory has not been properly released", memManager.verifyEmpty());
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1044a35..6bba3a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -42,8 +42,7 @@ public class FileChannelStreamsTest {
 
 	@Test
 	public void testCloseAndDeleteOutputView() {
-		final IOManager ioManager = new IOManagerAsync();
-		try {
+		try (IOManager ioManager = new IOManagerAsync()) {
 			MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -69,15 +68,11 @@ public class FileChannelStreamsTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioManager.shutdown();
-		}
 	}
 	
 	@Test
 	public void testCloseAndDeleteInputView() {
-		final IOManager ioManager = new IOManagerAsync();
-		try {
+		try (IOManager ioManager = new IOManagerAsync()) {
 			MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -110,8 +105,5 @@ public class FileChannelStreamsTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioManager.shutdown();
-		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index 4c6a2b3..1c5a2ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -39,12 +39,11 @@ public class SeekableFileChannelInputViewTest {
 
 	@Test
 	public void testSeek() {
-		final IOManager ioManager = new IOManagerAsync();
 		final int PAGE_SIZE = 16 * 1024;
 		final int NUM_RECORDS = 120000;
 		// integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
 		
-		try {
+		try (IOManager ioManager = new IOManagerAsync()) {
 			MemoryManager memMan = new MemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
 			memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -150,8 +149,5 @@ public class SeekableFileChannelInputViewTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioManager.shutdown();
-		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 01a9723..f94735c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -72,7 +72,7 @@ public class SpillingBufferTest {
 
 	@After
 	public void afterTest() {
-		ioManager.shutdown();
+		ioManager.close();
 		if (!ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index bc4c42a..5ef3983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -59,7 +59,7 @@ public class AsynchronousBufferFileWriterTest {
 
 	@AfterClass
 	public static void shutdown() {
-		ioManager.shutdown();
+		ioManager.close();
 	}
 
 	@Before
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index e3d5907..ead6490 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -58,7 +58,6 @@ public class AsynchronousFileIOChannelTest {
 		final int numberOfRequests = 100;
 
 		// -- Setup -----------------------------------------------------------
-		final IOManagerAsync ioManager = new IOManagerAsync();
 
 		final ExecutorService executor = Executors.newFixedThreadPool(3);
 
@@ -71,7 +70,7 @@ public class AsynchronousFileIOChannelTest {
 		final TestNotificationListener listener = new TestNotificationListener();
 
 		// -- The Test --------------------------------------------------------
-		try {
+		try (final IOManagerAsync ioManager = new IOManagerAsync()) {
 			// Repeatedly add requests and process them and have one thread try to register as a
 			// listener until the channel is closed and all requests are processed.
 
@@ -177,7 +176,6 @@ public class AsynchronousFileIOChannelTest {
 			}
 		}
 		finally {
-			ioManager.shutdown();
 			executor.shutdown();
 		}
 	}
@@ -188,7 +186,6 @@ public class AsynchronousFileIOChannelTest {
 		final int numberOfRuns = 1024;
 
 		// -- Setup -----------------------------------------------------------
-		final IOManagerAsync ioManager = new IOManagerAsync();
 
 		final ExecutorService executor = Executors.newFixedThreadPool(2);
 
@@ -200,7 +197,7 @@ public class AsynchronousFileIOChannelTest {
 		final TestNotificationListener listener = new TestNotificationListener();
 
 		// -- The Test --------------------------------------------------------
-		try {
+		try (final IOManagerAsync ioManager = new IOManagerAsync()) {
 			// Repeatedly close the channel and add a request.
 			for (int i = 0; i < numberOfRuns; i++) {
 				final TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel(
@@ -264,15 +261,13 @@ public class AsynchronousFileIOChannelTest {
 			}
 		}
 		finally {
-			ioManager.shutdown();
 			executor.shutdown();
 		}
 	}
 
 	@Test
 	public void testClosingWaits() {
-		IOManagerAsync ioMan = new IOManagerAsync();
-		try {
+		try (final IOManagerAsync ioMan = new IOManagerAsync()) {
 
 			final int NUM_BLOCKS = 100;
 			final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
@@ -318,20 +313,14 @@ public class AsynchronousFileIOChannelTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 
 	@Test
 	public void testExceptionForwardsToClose() {
-		IOManagerAsync ioMan = new IOManagerAsync();
-		try {
+		try (IOManagerAsync ioMan = new IOManagerAsync()) {
 			testExceptionForwardsToClose(ioMan, 100, 1);
 			testExceptionForwardsToClose(ioMan, 100, 50);
 			testExceptionForwardsToClose(ioMan, 100, 100);
-		} finally {
-			ioMan.shutdown();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 1aaefcb..d2c388c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -61,7 +61,7 @@ public class BufferFileWriterFileSegmentReaderTest {
 
 	@AfterClass
 	public static void shutdown() {
-		ioManager.shutdown();
+		ioManager.close();
 	}
 
 	@Before
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 29e7b44..78026bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -58,7 +58,7 @@ public class BufferFileWriterReaderTest {
 
 	@AfterClass
 	public static void shutdown() {
-		ioManager.shutdown();
+		ioManager.close();
 	}
 
 	@Before
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
index 4656d56..a4d3795 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -47,7 +47,7 @@ public class IOManagerAsyncTest {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
 	}
 
@@ -352,4 +352,4 @@ public class IOManagerAsyncTest {
 	final class TestIOException extends IOException {
 		private static final long serialVersionUID = -814705441998024472L;
 	}
-}
\ No newline at end of file
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 852e6e7..4a3f13f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -65,7 +65,7 @@ public class IOManagerITCase extends TestLogger {
 
 	@After
 	public void afterTest() throws Exception {
-		ioManager.shutdown();
+		ioManager.close();
 		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
 		
 		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memoryManager.verifyEmpty());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index 156098e..ce16bba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -41,22 +41,18 @@ public class IOManagerTest {
 
 	@Test
 	public void channelEnumerator() throws IOException {
-		IOManager ioMan = null;
-
-		try {
-			File tempPath = temporaryFolder.newFolder();
-
-			String[] tempDirs = new String[]{
-					new File(tempPath, "a").getAbsolutePath(),
-					new File(tempPath, "b").getAbsolutePath(),
-					new File(tempPath, "c").getAbsolutePath(),
-					new File(tempPath, "d").getAbsolutePath(),
-					new File(tempPath, "e").getAbsolutePath(),
-			};
-
-			int[] counters = new int[tempDirs.length];
-
-			ioMan = new TestIOManager(tempDirs);
+		File tempPath = temporaryFolder.newFolder();
+
+		String[] tempDirs = new String[]{
+			new File(tempPath, "a").getAbsolutePath(),
+			new File(tempPath, "b").getAbsolutePath(),
+			new File(tempPath, "c").getAbsolutePath(),
+			new File(tempPath, "d").getAbsolutePath(),
+			new File(tempPath, "e").getAbsolutePath(),
+		};
+
+		int[] counters = new int[tempDirs.length];
+		try (IOManager ioMan = new TestIOManager(tempDirs) ) {
 			FileIOChannel.Enumerator enumerator = ioMan.createChannelEnumerator();
 
 			for (int i = 0; i < 3 * tempDirs.length; i++) {
@@ -81,11 +77,6 @@ public class IOManagerTest {
 				assertEquals(3, counters[k]);
 			}
 		}
-		finally {
-			if (ioMan != null) {
-				ioMan.shutdown();
-			}
-		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 85315b7..cde6d89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -103,7 +103,7 @@ public class HashTableITCase extends TestLogger {
 	public void tearDown()
 	{
 		// shut down I/O manager and Memory Manager and verify the correct shutdown
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			fail("I/O manager was not property shut down.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index bc9daf2..c8748cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -130,8 +130,7 @@ public class HashTablePerformanceComparison {
 	
 	@Test
 	public void testMutableHashMapPerformance() {
-		final IOManager ioManager = new IOManagerAsync();
-		try {
+		try (IOManager ioManager = new IOManagerAsync()) {
 			final int NUM_MEM_PAGES = SIZE * NUM_PAIRS / PAGE_SIZE;
 			
 			MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_PAIRS, 1, false);
@@ -206,8 +205,6 @@ public class HashTablePerformanceComparison {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
-		} finally {
-			ioManager.shutdown();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index bcf620c..2e3748f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -110,10 +110,7 @@ public class HashTableTest {
 	 */
 	@Test
 	public void testBufferMissingForProbing() {
-
-		final IOManager ioMan = new IOManagerAsync();
-
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final int pageSize = 32*1024;
 			final int numSegments = 34;
 			final int numRecords = 3400;
@@ -151,9 +148,6 @@ public class HashTableTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 
 	/**
@@ -163,8 +157,6 @@ public class HashTableTest {
 	 */
 	@Test
 	public void testSpillingFreesOnlyOverflowSegments() {
-		final IOManager ioMan = new IOManagerAsync();
-		
 		final TypeSerializer<ByteValue> serializer = ByteValueSerializer.INSTANCE;
 		final TypeComparator<ByteValue> buildComparator = new ValueComparator<>(true, ByteValue.class);
 		final TypeComparator<ByteValue> probeComparator = new ValueComparator<>(true, ByteValue.class);
@@ -172,7 +164,7 @@ public class HashTableTest {
 		@SuppressWarnings("unchecked")
 		final TypePairComparator<ByteValue, ByteValue> pairComparator = Mockito.mock(TypePairComparator.class);
 		
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final int pageSize = 32*1024;
 			final int numSegments = 34;
 
@@ -192,9 +184,6 @@ public class HashTableTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 
 	/**
@@ -203,9 +192,7 @@ public class HashTableTest {
 	 */
 	@Test
 	public void testSpillingWhenBuildingTableWithoutOverflow() throws Exception {
-		final IOManager ioMan = new IOManagerAsync();
-
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final TypeSerializer<byte[]> serializer = BytePrimitiveArraySerializer.INSTANCE;
 			final TypeComparator<byte[]> buildComparator = new BytePrimitiveArrayComparator(true);
 			final TypeComparator<byte[]> probeComparator = new BytePrimitiveArrayComparator(true);
@@ -254,8 +241,6 @@ public class HashTableTest {
 			}
 
 			table.close();
-		} finally {
-			ioMan.shutdown();
 		}
 	}
 	
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
index 802870b..4512ee8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
@@ -105,7 +105,7 @@ public class NonReusingHashJoinIteratorITCase extends TestLogger {
 	@After
 	public void afterTest() {
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
index 020f1c3..754da40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -83,7 +83,7 @@ public class ReOpenableHashTableITCase extends TestLogger {
 	@After
 	public void afterTest() {
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
index 4008aa2..e6ac58a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
@@ -94,7 +94,7 @@ public abstract class ReOpenableHashTableTestBase extends TestLogger {
 	@After
 	public void afterTest() {
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
index 8a51102..c199e38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
@@ -113,7 +113,7 @@ public class ReusingHashJoinIteratorITCase extends TestLogger {
 	@After
 	public void afterTest() {
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index 0ab9a54..3f5a25d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -72,7 +72,7 @@ public class SpillingResettableIteratorTest {
 
 	@After
 	public void shutdown() {
-		this.ioman.shutdown();
+		this.ioman.close();
 		if (!this.ioman.isProperlyShutDown()) {
 			Assert.fail("I/O Manager Shutdown was not completed properly.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index ef48a1f..2a69a63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -69,7 +69,7 @@ public class SpillingResettableMutableObjectIteratorTest {
 
 	@After
 	public void shutdown() {
-		this.ioman.shutdown();
+		this.ioman.close();
 		if (!this.ioman.isProperlyShutDown()) {
 			Assert.fail("I/O Manager Shutdown was not completed properly.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 94c0fd4..7cdf07f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -111,7 +111,7 @@ public abstract class AbstractSortMergeOuterJoinIteratorITCase extends TestLogge
 	@After
 	public void afterTest() {
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index d32cad0..96831f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -93,7 +93,7 @@ public class CombiningUnilateralSortMergerITCase extends TestLogger {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index b30adc2..d1aa0a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -86,7 +86,7 @@ public class ExternalSortITCase extends TestLogger {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 530951f..335dbee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -72,7 +72,7 @@ public class ExternalSortLargeRecordsITCase extends TestLogger {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index bba713e..2f05c78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -79,7 +79,7 @@ public class FixedLengthRecordSorterTest {
 		}
 
 		if (this.ioManager != null) {
-			ioManager.shutdown();
+			ioManager.close();
 			ioManager = null;
 		}
 		
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 8f9e4dd..e73ee8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -58,13 +58,11 @@ public class LargeRecordHandlerITCase extends TestLogger {
 
 	@Test
 	public void testRecordHandlerCompositeKey() {
-		
-		final IOManager ioMan = new IOManagerAsync();
 		final int PAGE_SIZE = 4 * 1024;
 		final int NUM_PAGES = 1000;
 		final int NUM_RECORDS = 10;
 		
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
@@ -145,9 +143,6 @@ public class LargeRecordHandlerITCase extends TestLogger {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 	
 	public static final class SomeVeryLongValue implements Value {
@@ -193,15 +188,13 @@ public class LargeRecordHandlerITCase extends TestLogger {
 	
 	@Test
 	public void fileTest() {
-		
-		final IOManager ioMan = new IOManagerAsync();
 		final int PAGE_SIZE = 4 * 1024;
 		final int NUM_PAGES = 4;
 		final int NUM_RECORDS = 10;
 		
 		FileIOChannel.ID channel = null;
 		
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
@@ -260,13 +253,6 @@ public class LargeRecordHandlerITCase extends TestLogger {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			if (channel != null) {
-				ioMan.deleteChannel(channel);
-			}
-
-			ioMan.shutdown();
-		}
 	}
 
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
index a59e630..7e9acf6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -46,12 +46,10 @@ public class LargeRecordHandlerTest {
 
 	@Test
 	public void testEmptyRecordHandler() {
-		
-		final IOManager ioMan = new IOManagerAsync();
 		final int PAGE_SIZE = 4 * 1024;
 		final int NUM_PAGES = 50;
 		
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES);
@@ -88,20 +86,15 @@ public class LargeRecordHandlerTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 	
 	@Test
 	public void testRecordHandlerSingleKey() {
-		
-		final IOManager ioMan = new IOManagerAsync();
 		final int PAGE_SIZE = 4 * 1024;
 		final int NUM_PAGES = 24;
 		final int NUM_RECORDS = 25000;
 		
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
@@ -174,20 +167,15 @@ public class LargeRecordHandlerTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 	
 	@Test
 	public void testRecordHandlerCompositeKey() {
-		
-		final IOManager ioMan = new IOManagerAsync();
 		final int PAGE_SIZE = 4 * 1024;
 		final int NUM_PAGES = 24;
 		final int NUM_RECORDS = 25000;
 		
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
 			final AbstractInvokable owner = new DummyInvokable();
 			
@@ -262,8 +250,5 @@ public class LargeRecordHandlerTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index 394d44c..c729a3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -112,7 +112,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase extends TestLogger {
 	@After
 	public void afterTest() {
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index 8e5bd95..d149084 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -112,7 +112,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase extends TestLogger {
 	@After
 	public void afterTest() {
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
index cdaa5b1..c014e23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
@@ -52,10 +52,9 @@ public class UnilateralSortMergerTest extends TestLogger {
 
 		final int numPages = 32;
 		final MemoryManager memoryManager = new MemoryManager(MemoryManager.DEFAULT_PAGE_SIZE * numPages, 1);
-		final IOManagerAsync ioManager = new IOManagerAsync();
 		final DummyInvokable parentTask = new DummyInvokable();
 
-		try {
+		try (final IOManagerAsync ioManager = new IOManagerAsync()) {
 			final List<MemorySegment> memory = memoryManager.allocatePages(parentTask, numPages);
 			final UnilateralSortMerger<Tuple2<Integer, Integer>> unilateralSortMerger = new UnilateralSortMerger<>(
 				memoryManager,
@@ -85,7 +84,6 @@ public class UnilateralSortMergerTest extends TestLogger {
 				assertThat(inMemorySorter.isDisposed(), is(true));
 			}
 		} finally {
-			ioManager.shutdown();
 			memoryManager.shutdown();
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index a76f110..d2a4f6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -388,7 +388,7 @@ public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT> extend
 		this.sorters.clear();
 		
 		// 2nd, shutdown I/O
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
 		
 		// last, verify all memory is returned and shutdown mem manager
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 3820bf9..e1d2cb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -385,7 +385,7 @@ public abstract class DriverTestBase<S extends Function> extends TestLogger impl
 		this.sorters.clear();
 		
 		// 2nd, shutdown I/O
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
 
 		// last, verify all memory is returned and shutdown mem manager
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 89a6ea4..2c96ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -343,7 +343,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		}
 
 		memManager.shutdown();
-		ioManager.shutdown();
+		ioManager.close();
 
 		checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down.");
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 2ef82da..1ff50b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -378,7 +378,7 @@ public abstract class UnaryOperatorTestBase<S extends Function, IN, OUT> extends
 		}
 		
 		// 2nd, shutdown I/O
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
 
 		// last, verify all memory is returned and shutdown mem manager
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 2696045..7b4eaba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -106,7 +106,7 @@ public class HashVsSortMiniBenchmark {
 		}
 		
 		if (this.ioManager != null) {
-			this.ioManager.shutdown();
+			this.ioManager.close();
 			if (!this.ioManager.isProperlyShutDown()) {
 				Assert.fail("I/O manager failed to properly shut down.");
 			}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index 4d46451..d19b99c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -55,7 +55,7 @@ public class BufferSpillerTest extends BufferStorageTestBase {
 
 	@AfterClass
 	public static void shutdownIOManager() {
-		ioManager.shutdown();
+		ioManager.close();
 	}
 
 	@Before
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 0621179..535dfc0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -78,7 +78,7 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
 
 	@AfterClass
 	public static void shutdownIOManager() {
-		ioManager.shutdown();
+		ioManager.close();
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 82920aa..2b452bd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -47,11 +47,9 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 
 	@Test
 	public void testWithTwoChannelsAndRandomBarriers() {
-		IOManager ioMan = null;
 		NetworkBufferPool networkBufferPool1 = null;
 		NetworkBufferPool networkBufferPool2 = null;
-		try {
-			ioMan = new IOManagerAsync();
+		try (IOManager ioMan = new IOManagerAsync()) {
 
 			networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE, 1);
 			networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE, 1);
@@ -76,9 +74,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 			fail(e.getMessage());
 		}
 		finally {
-			if (ioMan != null) {
-				ioMan.shutdown();
-			}
 			if (networkBufferPool1 != null) {
 				networkBufferPool1.destroyAllBufferPools();
 				networkBufferPool1.destroy();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
index c892073..0d28325 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
@@ -49,7 +49,7 @@ public class SpillingCheckpointBarrierAlignerTest extends CheckpointBarrierAlign
 
 	@AfterClass
 	public static void shutdownIOManager() {
-		ioManager.shutdown();
+		ioManager.close();
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 0c399f7..b7d7430 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -168,7 +168,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 	public void tearDown() {
 		suppressExceptions(senderEnv::close);
 		suppressExceptions(receiverEnv::close);
-		suppressExceptions(ioManager::shutdown);
+		suppressExceptions(ioManager::close);
 	}
 
 	public SerializingLongReceiver createReceiver() throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 12fa8be..c059757 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -325,7 +325,7 @@ public class StreamTaskTestHarness<OUT> {
 	}
 
 	private void shutdownIOManager() throws Exception {
-		this.mockEnv.getIOManager().shutdown();
+		this.mockEnv.getIOManager().close();
 		Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
index 9bab9b9..f35a40c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
@@ -95,7 +95,7 @@ public class HashAggTest {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
index ee08da5..de4e949 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
@@ -95,7 +95,7 @@ public class BinaryHashTableTest {
 	@After
 	public void tearDown() {
 		// shut down I/O manager and Memory Manager and verify the correct shutdown
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			fail("I/O manager was not property shut down.");
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
index 177e83b..14039c8 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
@@ -51,7 +51,7 @@ public class CompressedHeaderlessChannelTest {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
index 1402c11..7b47903 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
@@ -78,7 +78,7 @@ public class Int2SortMergeJoinOperatorTest {
 	@After
 	public void tearDown() {
 		// shut down I/O manager and Memory Manager and verify the correct shutdown
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			fail("I/O manager was not property shut down.");
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
index 8b389c5..8eb62d0 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
@@ -101,7 +101,7 @@ public class BinaryExternalSorterTest {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
index 1928696..e095df1 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
@@ -103,7 +103,7 @@ public class BufferedKVExternalSorterTest {
 
 	@After
 	public void afterTest() {
-		this.ioManager.shutdown();
+		this.ioManager.close();
 		if (!this.ioManager.isProperlyShutDown()) {
 			Assert.fail("I/O Manager was not properly shut down.");
 		}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
index f02cf1c..f1664ae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -85,9 +85,7 @@ public class HashTableRecordWidthCombinations {
 			}
 		};
 
-		final IOManager ioMan = new IOManagerAsync();
-
-		try {
+		try (final IOManager ioMan = new IOManagerAsync()) {
 			final int pageSize = 32 * 1024;
 			final int numSegments = 34;
 
@@ -175,9 +173,6 @@ public class HashTableRecordWidthCombinations {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			ioMan.shutdown();
-		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index eb99909..1d48eee 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -83,11 +83,9 @@ public class MassiveStringSorting {
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
 			MemoryManager mm = null;
-			IOManager ioMan = null;
 
-			try {
+			try (IOManager ioMan = new IOManagerAsync()) {
 				mm = new MemoryManager(1024 * 1024, 1);
-				ioMan = new IOManagerAsync();
 
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
 				TypeComparator<String> comparator = new StringComparator(true);
@@ -127,9 +125,6 @@ public class MassiveStringSorting {
 				if (mm != null) {
 					mm.shutdown();
 				}
-				if (ioMan != null) {
-					ioMan.shutdown();
-				}
 			}
 		}
 		catch (Exception e) {
@@ -182,11 +177,9 @@ public class MassiveStringSorting {
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
 			MemoryManager mm = null;
-			IOManager ioMan = null;
 
-			try {
+			try (IOManager ioMan = new IOManagerAsync()) {
 				mm = new MemoryManager(1024 * 1024, 1);
-				ioMan = new IOManagerAsync();
 
 				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>)
 						new TypeHint<Tuple2<String, String[]>>(){}.getTypeInfo();
@@ -256,9 +249,6 @@ public class MassiveStringSorting {
 				if (mm != null) {
 					mm.shutdown();
 				}
-				if (ioMan != null) {
-					ioMan.shutdown();
-				}
 			}
 		}
 		catch (Exception e) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 5dcf209..861f1df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -84,11 +84,9 @@ public class MassiveStringValueSorting {
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
 			MemoryManager mm = null;
-			IOManager ioMan = null;
 
-			try {
+			try (IOManager ioMan = new IOManagerAsync()) {
 				mm = new MemoryManager(1024 * 1024, 1);
-				ioMan = new IOManagerAsync();
 
 				TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class);
 				TypeComparator<StringValue> comparator = new CopyableValueComparator<StringValue>(true, StringValue.class);
@@ -129,9 +127,6 @@ public class MassiveStringValueSorting {
 				if (mm != null) {
 					mm.shutdown();
 				}
-				if (ioMan != null) {
-					ioMan.shutdown();
-				}
 			}
 		}
 		catch (Exception e) {
@@ -186,11 +181,9 @@ public class MassiveStringValueSorting {
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
 			MemoryManager mm = null;
-			IOManager ioMan = null;
 
-			try {
+			try (IOManager ioMan = new IOManagerAsync()) {
 				mm = new MemoryManager(1024 * 1024, 1);
-				ioMan = new IOManagerAsync();
 
 				TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
 						new TypeHint<Tuple2<StringValue, StringValue[]>>(){}.getTypeInfo();
@@ -260,9 +253,6 @@ public class MassiveStringValueSorting {
 				if (mm != null) {
 					mm.shutdown();
 				}
-				if (ioMan != null) {
-					ioMan.shutdown();
-				}
 			}
 		}
 		catch (Exception e) {


[flink] 06/06: [hotfix][runtime] Remove legacy NoOpIOManager class

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0f747dca5b22312c50056aacb7d5b6f8c3fa1ac
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Mon Jun 24 11:59:09 2019 +0800

    [hotfix][runtime] Remove legacy NoOpIOManager class
---
 .../runtime/io/disk/iomanager/NoOpIOManager.java   | 73 ----------------------
 1 file changed, 73 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java
deleted file mode 100644
index f98c46f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * An {@link IOManager} that cannot do I/O but serves as a mock for tests.
- */
-public class NoOpIOManager extends IOManager {
-
-	public NoOpIOManager() {
-		super(new String[] {EnvironmentInformation.getTemporaryFileDirectory()});
-	}
-
-	@Override
-	public BlockChannelWriter<MemorySegment> createBlockChannelWriter(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BlockChannelReader<MemorySegment> createBlockChannelReader(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BufferFileWriter createBufferFileWriter(ID channelID) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BufferFileReader createBufferFileReader(ID channelID, RequestDoneCallback<Buffer> callback) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BufferFileSegmentReader createBufferFileSegmentReader(ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BulkBlockChannelReader createBulkBlockChannelReader(ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-}