You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:06:00 UTC

[flink] 05/06: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80003d62f386fc95a3dbbc414f2cd4de7a26e1bd
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 27 00:29:16 2019 +0800

    [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
    
    The current creation of NettyShuffleEnvironment relies on IOManager from TaskManagerServices. Actually the shuffle only needs the
    file channel during creating partition, so it could internally create a light-weight FileChannelManager with its own prefix folder
    name instead of the heavy-weight IOManagerAsync.
---
 .../io/network/NettyShuffleEnvironment.java        | 13 ++++++
 .../io/network/NettyShuffleServiceFactory.java     | 17 +++++---
 .../network/partition/ResultPartitionFactory.java  | 16 +++----
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 10 +----
 .../runtime/taskexecutor/TaskManagerServices.java  |  9 ++--
 .../NettyShuffleEnvironmentConfiguration.java      | 23 ++++++++--
 .../runtime/io/disk/NoOpFileChannelManager.java    | 51 ++++++++++++++++++++++
 .../io/network/NettyShuffleEnvironmentBuilder.java | 17 ++++----
 .../io/network/NettyShuffleEnvironmentTest.java    | 23 +++++++++-
 .../partition/BoundedBlockingSubpartitionTest.java | 24 +++++++++-
 .../BoundedBlockingSubpartitionWriteReadTest.java  | 21 ++++++++-
 .../io/network/partition/PartitionTestUtils.java   | 21 +++++++++
 .../network/partition/ResultPartitionBuilder.java  | 12 ++---
 .../partition/ResultPartitionFactoryTest.java      | 22 +++++++++-
 .../io/network/partition/ResultPartitionTest.java  | 27 +++++++++++-
 .../StreamNetworkBenchmarkEnvironment.java         |  7 ---
 16 files changed, 250 insertions(+), 63 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 5171d75..17fb2cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
@@ -85,6 +86,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 
 	private final ResultPartitionManager resultPartitionManager;
 
+	private final FileChannelManager fileChannelManager;
+
 	private final Map<InputGateID, SingleInputGate> inputGatesById;
 
 	private final ResultPartitionFactory resultPartitionFactory;
@@ -99,6 +102,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 			NetworkBufferPool networkBufferPool,
 			ConnectionManager connectionManager,
 			ResultPartitionManager resultPartitionManager,
+			FileChannelManager fileChannelManager,
 			ResultPartitionFactory resultPartitionFactory,
 			SingleInputGateFactory singleInputGateFactory) {
 		this.taskExecutorResourceId = taskExecutorResourceId;
@@ -107,6 +111,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 		this.connectionManager = connectionManager;
 		this.resultPartitionManager = resultPartitionManager;
 		this.inputGatesById = new ConcurrentHashMap<>(10);
+		this.fileChannelManager = fileChannelManager;
 		this.resultPartitionFactory = resultPartitionFactory;
 		this.singleInputGateFactory = singleInputGateFactory;
 		this.isClosed = false;
@@ -326,6 +331,14 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 				LOG.warn("Network buffer pool did not shut down properly.", t);
 			}
 
+			// delete all the temp directories
+			try {
+				fileChannelManager.close();
+			}
+			catch (Throwable t) {
+				LOG.warn("Cannot close the file channel manager properly.", t);
+			}
+
 			isClosed = true;
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 74dbe0f..be31fb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -22,7 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
@@ -45,6 +46,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettyShuffleDescriptor, ResultPartition, SingleInputGate> {
 
+	private static final String DIR_NAME_PREFIX = "netty-shuffle";
+
 	@Override
 	public NettyShuffleMaster createShuffleMaster(Configuration configuration) {
 		return NettyShuffleMaster.INSTANCE;
@@ -62,8 +65,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			networkConfig,
 			shuffleEnvironmentContext.getTaskExecutorResourceId(),
 			shuffleEnvironmentContext.getEventPublisher(),
-			shuffleEnvironmentContext.getParentMetricGroup(),
-			shuffleEnvironmentContext.getIOManager());
+			shuffleEnvironmentContext.getParentMetricGroup());
 	}
 
 	@VisibleForTesting
@@ -71,18 +73,18 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			NettyShuffleEnvironmentConfiguration config,
 			ResourceID taskExecutorResourceId,
 			TaskEventPublisher taskEventPublisher,
-			MetricGroup metricGroup,
-			IOManager ioManager) {
+			MetricGroup metricGroup) {
 		checkNotNull(config);
 		checkNotNull(taskExecutorResourceId);
 		checkNotNull(taskEventPublisher);
 		checkNotNull(metricGroup);
-		checkNotNull(ioManager);
 
 		NettyConfig nettyConfig = config.nettyConfig();
 
 		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
 
+		FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
+
 		ConnectionManager connectionManager = nettyConfig != null ?
 			new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.isCreditBased()) :
 			new LocalConnectionManager();
@@ -96,7 +98,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 
 		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
 			resultPartitionManager,
-			ioManager,
+			fileChannelManager,
 			networkBufferPool,
 			config.networkBuffersPerChannel(),
 			config.floatingNetworkBuffersPerGate());
@@ -115,6 +117,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			networkBufferPool,
 			connectionManager,
 			resultPartitionManager,
+			fileChannelManager,
 			resultPartitionFactory,
 			singleInputGateFactory);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 137ea5f..aaaecf6 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
@@ -52,7 +52,7 @@ public class ResultPartitionFactory {
 	private final ResultPartitionManager partitionManager;
 
 	@Nonnull
-	private final IOManager ioManager;
+	private final FileChannelManager channelManager;
 
 	@Nonnull
 	private final BufferPoolFactory bufferPoolFactory;
@@ -63,13 +63,13 @@ public class ResultPartitionFactory {
 
 	public ResultPartitionFactory(
 		@Nonnull ResultPartitionManager partitionManager,
-		@Nonnull IOManager ioManager,
+		@Nonnull FileChannelManager channelManager,
 		@Nonnull BufferPoolFactory bufferPoolFactory,
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate) {
 
 		this.partitionManager = partitionManager;
-		this.ioManager = ioManager;
+		this.channelManager = channelManager;
 		this.networkBuffersPerChannel = networkBuffersPerChannel;
 		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
 		this.bufferPoolFactory = bufferPoolFactory;
@@ -135,7 +135,7 @@ public class ResultPartitionFactory {
 		// Create the subpartitions.
 		switch (type) {
 			case BLOCKING:
-				initializeBoundedBlockingPartitions(subpartitions, partition, ioManager, networkBufferSize);
+				initializeBoundedBlockingPartitions(subpartitions, partition, networkBufferSize, channelManager);
 				break;
 
 			case PIPELINED:
@@ -154,13 +154,13 @@ public class ResultPartitionFactory {
 	private static void initializeBoundedBlockingPartitions(
 		ResultSubpartition[] subpartitions,
 		ResultPartition parent,
-		IOManager ioManager,
-		int networkBufferSize) {
+		int networkBufferSize,
+		FileChannelManager channelManager) {
 
 		int i = 0;
 		try {
 			for (; i < subpartitions.length; i++) {
-				final File spillFile = ioManager.createChannel().getPathFile();
+				final File spillFile = channelManager.createChannel().getPathFile();
 				subpartitions[i] = BOUNDED_BLOCKING_TYPE.create(i, parent, spillFile, networkBufferSize);
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index bde82eb..7c1abc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
@@ -39,7 +38,6 @@ public class ShuffleEnvironmentContext {
 	private final InetAddress hostAddress;
 	private final TaskEventPublisher eventPublisher;
 	private final MetricGroup parentMetricGroup;
-	private final IOManager ioManager;
 
 	public ShuffleEnvironmentContext(
 			Configuration configuration,
@@ -48,8 +46,7 @@ public class ShuffleEnvironmentContext {
 			boolean localCommunicationOnly,
 			InetAddress hostAddress,
 			TaskEventPublisher eventPublisher,
-			MetricGroup parentMetricGroup,
-			IOManager ioManager) {
+			MetricGroup parentMetricGroup) {
 		this.configuration = checkNotNull(configuration);
 		this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId);
 		this.maxJvmHeapMemory = maxJvmHeapMemory;
@@ -57,7 +54,6 @@ public class ShuffleEnvironmentContext {
 		this.hostAddress = checkNotNull(hostAddress);
 		this.eventPublisher = checkNotNull(eventPublisher);
 		this.parentMetricGroup = checkNotNull(parentMetricGroup);
-		this.ioManager = checkNotNull(ioManager);
 	}
 
 	public Configuration getConfiguration() {
@@ -87,8 +83,4 @@ public class ShuffleEnvironmentContext {
 	public MetricGroup getParentMetricGroup() {
 		return parentMetricGroup;
 	}
-
-	public IOManager getIOManager() {
-		return ioManager;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 0aafce0..8de72f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -244,8 +244,7 @@ public class TaskManagerServices {
 		final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
 			taskManagerServicesConfiguration,
 			taskEventDispatcher,
-			taskManagerMetricGroup,
-			ioManager);
+			taskManagerMetricGroup);
 		final int dataPort = shuffleEnvironment.start();
 
 		final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -307,8 +306,7 @@ public class TaskManagerServices {
 	private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
 			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
 			TaskEventDispatcher taskEventDispatcher,
-			MetricGroup taskManagerMetricGroup,
-			IOManager ioManager) throws FlinkException {
+			MetricGroup taskManagerMetricGroup) throws FlinkException {
 
 		final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
 			taskManagerServicesConfiguration.getConfiguration(),
@@ -317,8 +315,7 @@ public class TaskManagerServices {
 			taskManagerServicesConfiguration.isLocalCommunicationOnly(),
 			taskManagerServicesConfiguration.getTaskManagerAddress(),
 			taskEventDispatcher,
-			taskManagerMetricGroup,
-			ioManager);
+			taskManagerMetricGroup);
 
 		return ShuffleServiceLoader
 			.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index a6ff17b..daccd3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,7 @@ import javax.annotation.Nullable;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 
 /**
  * Configuration object for the network stack.
@@ -62,6 +65,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
 	private final NettyConfig nettyConfig;
 
+	private final String[] tempDirs;
+
 	public NettyShuffleEnvironmentConfiguration(
 			int numNetworkBuffers,
 			int networkBufferSize,
@@ -71,7 +76,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			int floatingNetworkBuffersPerGate,
 			boolean isCreditBased,
 			boolean isNetworkDetailedMetrics,
-			@Nullable NettyConfig nettyConfig) {
+			@Nullable NettyConfig nettyConfig,
+			String[] tempDirs) {
 
 		this.numNetworkBuffers = numNetworkBuffers;
 		this.networkBufferSize = networkBufferSize;
@@ -82,6 +88,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		this.isCreditBased = isCreditBased;
 		this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
 		this.nettyConfig = nettyConfig;
+		this.tempDirs = Preconditions.checkNotNull(tempDirs);
 	}
 
 	// ------------------------------------------------------------------------
@@ -122,6 +129,10 @@ public class NettyShuffleEnvironmentConfiguration {
 		return isNetworkDetailedMetrics;
 	}
 
+	public String[] getTempDirs() {
+		return tempDirs;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -158,6 +169,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		boolean isNetworkDetailedMetrics = configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
 
+		String[] tempDirs = ConfigurationUtils.parseTempDirectories(configuration);
+
 		return new NettyShuffleEnvironmentConfiguration(
 			numberOfNetworkBuffers,
 			pageSize,
@@ -167,7 +180,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			extraBuffersPerGate,
 			isCreditBased,
 			isNetworkDetailedMetrics,
-			nettyConfig);
+			nettyConfig,
+			tempDirs);
 	}
 
 	/**
@@ -467,6 +481,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		result = 31 * result + floatingNetworkBuffersPerGate;
 		result = 31 * result + (isCreditBased ? 1 : 0);
 		result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
+		result = 31 * result + Arrays.hashCode(tempDirs);
 		return result;
 	}
 
@@ -488,7 +503,8 @@ public class NettyShuffleEnvironmentConfiguration {
 					this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
 					this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
 					this.isCreditBased == that.isCreditBased &&
-					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
+					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
+					Arrays.equals(this.tempDirs, that.tempDirs);
 		}
 	}
 
@@ -503,6 +519,7 @@ public class NettyShuffleEnvironmentConfiguration {
 				", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
 				", isCreditBased=" + isCreditBased +
 				", nettyConfig=" + nettyConfig +
+				", tempDirs=" + Arrays.toString(tempDirs) +
 				'}';
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
new file mode 100644
index 0000000..4fcf3ab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * An {@link FileChannelManager} that cannot do I/O but serves as a mock for tests.
+ */
+public enum NoOpFileChannelManager implements FileChannelManager {
+
+	INSTANCE;
+
+	@Override
+	public ID createChannel() {
+		throw  new UnsupportedOperationException();
+	}
+
+	@Override
+	public Enumerator createChannelEnumerator() {
+		throw  new UnsupportedOperationException();
+	}
+
+	@Override
+	public File[] getPaths() {
+		throw  new UnsupportedOperationException();
+	}
+
+	@Override
+	public void close() {
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index b0ef430..32b3744 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,17 +20,18 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
  */
 public class NettyShuffleEnvironmentBuilder {
 
+	private static final String[] DEFAULT_TEMP_DIRS = new String[] {EnvironmentInformation.getTemporaryFileDirectory()};
+
 	private int numNetworkBuffers = 1024;
 
 	private int networkBufferSize = 32 * 1024;
@@ -55,7 +56,7 @@ public class NettyShuffleEnvironmentBuilder {
 
 	private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
-	private IOManager ioManager = new IOManagerAsync();
+	private String[] tempDirs = DEFAULT_TEMP_DIRS;
 
 	public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) {
 		this.taskManagerLocation = taskManagerLocation;
@@ -112,8 +113,8 @@ public class NettyShuffleEnvironmentBuilder {
 		return this;
 	}
 
-	public NettyShuffleEnvironmentBuilder setIOManager(IOManager ioManager) {
-		this.ioManager = ioManager;
+	public NettyShuffleEnvironmentBuilder setTempDirs(String[] tempDirs) {
+		this.tempDirs = tempDirs;
 		return this;
 	}
 
@@ -128,10 +129,10 @@ public class NettyShuffleEnvironmentBuilder {
 				floatingNetworkBuffersPerGate,
 				isCreditBased,
 				isNetworkDetailedMetrics,
-				nettyConfig),
+				nettyConfig,
+				tempDirs),
 			taskManagerLocation,
 			taskEventDispatcher,
-			metricGroup,
-			ioManager);
+			metricGroup);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index d4f0727..11951f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -28,8 +30,11 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -53,6 +58,10 @@ import static org.powermock.api.mockito.PowerMockito.spy;
 @RunWith(Parameterized.class)
 public class NettyShuffleEnvironmentTest extends TestLogger {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
 	@Parameterized.Parameter
 	public boolean enableCreditBasedFlowControl;
 
@@ -64,6 +73,16 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	/**
 	 * Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
 	 * instances for various types of input and output channels.
@@ -76,7 +95,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		// result partitions
 		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
 		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
 		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
@@ -179,7 +198,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		// result partitions
 		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createPartition(network, ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
 		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
 		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index e76cd1e..9bd0c4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -18,8 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -29,6 +34,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -41,9 +47,23 @@ import static org.junit.Assert.fail;
  */
 public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
 	@ClassRule
 	public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
 
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Test
@@ -74,14 +94,14 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 
 	@Override
 	ResultSubpartition createSubpartition() throws Exception {
-		final ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+		final ResultPartition resultPartition = createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
 		return BoundedBlockingSubpartition.createWithMemoryMappedFile(
 				0, resultPartition, new File(TMP_DIR.newFolder(), "subpartition"));
 	}
 
 	@Override
 	ResultSubpartition createFailingWritesSubpartition() throws Exception {
-		final ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+		final ResultPartition resultPartition = createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
 
 		return new BoundedBlockingSubpartition(
 				0,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
index 359ad8d..67ab6fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -21,9 +21,14 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,6 +53,10 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 public class BoundedBlockingSubpartitionWriteReadTest {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
 	@ClassRule
 	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
@@ -71,6 +80,16 @@ public class BoundedBlockingSubpartitionWriteReadTest {
 	//  tests
 	// ------------------------------------------------------------------------
 
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	@Test
 	public void testWriteAndReadData() throws Exception {
 		final int numLongs = 15_000_000; // roughly 115 MiBytes
@@ -188,7 +207,7 @@ public class BoundedBlockingSubpartitionWriteReadTest {
 	private BoundedBlockingSubpartition createSubpartition() throws IOException {
 		return type.create(
 				0,
-				PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING),
+				PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, fileChannelManager),
 				new File(TMP_FOLDER.newFolder(), "partitiondata"),
 				BUFFER_SIZE);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 51eff94..e559659 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
@@ -50,12 +51,32 @@ public class PartitionTestUtils {
 		return new ResultPartitionBuilder().setResultPartitionType(type).build();
 	}
 
+	public static ResultPartition createPartition(ResultPartitionType type, FileChannelManager channelManager) {
+		return new ResultPartitionBuilder()
+			.setResultPartitionType(type)
+			.setFileChannelManager(channelManager)
+			.build();
+	}
+
+	public static ResultPartition createPartition(
+			NettyShuffleEnvironment environment,
+			ResultPartitionType partitionType,
+			int numChannels) {
+		return new ResultPartitionBuilder()
+			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
+			.setResultPartitionType(partitionType)
+			.setNumberOfSubpartitions(numChannels)
+			.build();
+	}
+
 	public static ResultPartition createPartition(
 			NettyShuffleEnvironment environment,
+			FileChannelManager channelManager,
 			ResultPartitionType partitionType,
 			int numChannels) {
 		return new ResultPartitionBuilder()
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
+			.setFileChannelManager(channelManager)
 			.setResultPartitionType(partitionType)
 			.setNumberOfSubpartitions(numChannels)
 			.build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index bf403ac..3da8eb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -44,7 +44,7 @@ public class ResultPartitionBuilder {
 
 	private ResultPartitionManager partitionManager = new ResultPartitionManager();
 
-	private IOManager ioManager = new IOManagerAsync();
+	private FileChannelManager channelManager = NoOpFileChannelManager.INSTANCE;
 
 	private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1, 1);
 
@@ -82,8 +82,8 @@ public class ResultPartitionBuilder {
 		return this;
 	}
 
-	public ResultPartitionBuilder setIOManager(IOManager ioManager) {
-		this.ioManager = ioManager;
+	public ResultPartitionBuilder setFileChannelManager(FileChannelManager channelManager) {
+		this.channelManager = channelManager;
 		return this;
 	}
 
@@ -122,7 +122,7 @@ public class ResultPartitionBuilder {
 	public ResultPartition build() {
 		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
 			partitionManager,
-			ioManager,
+			channelManager,
 			networkBufferPool,
 			networkBuffersPerChannel,
 			floatingNetworkBuffersPerGate);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 0c6848b..f2e0e58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -18,15 +18,19 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -38,6 +42,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
  */
 public class ResultPartitionFactoryTest extends TestLogger {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	@Test
 	public void testConsumptionOnReleaseEnabled() {
 		final ResultPartition resultPartition = createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
@@ -53,7 +71,7 @@ public class ResultPartitionFactoryTest extends TestLogger {
 	private static ResultPartition createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
 		ResultPartitionFactory factory = new ResultPartitionFactory(
 			new ResultPartitionManager(),
-			new NoOpIOManager(),
+			fileChannelManager,
 			new NetworkBufferPool(1, 64, 1),
 			1,
 			1
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 2072a12..1024780 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -28,8 +30,11 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -55,6 +60,20 @@ import static org.mockito.Mockito.verify;
  */
 public class ResultPartitionTest {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	/**
 	 * Tests the schedule or update consumers message sending behaviour depending on the relevant flags.
 	 */
@@ -107,6 +126,7 @@ public class ResultPartitionTest {
 			.isReleasedOnConsumption(false)
 			.setResultPartitionManager(manager)
 			.setResultPartitionType(ResultPartitionType.BLOCKING)
+			.setFileChannelManager(fileChannelManager)
 			.build();
 
 		manager.registerResultPartition(partition);
@@ -181,7 +201,8 @@ public class ResultPartitionTest {
 		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 		JobID jobId = new JobID();
 		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartition partition = createPartition(partitionType);
+		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
+			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
 		ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
 			new ResultPartitionWriter[] {partition},
@@ -311,9 +332,11 @@ public class ResultPartitionTest {
 			TaskActions taskActions,
 			JobID jobId,
 			ResultPartitionConsumableNotifier notifier) {
+		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
+			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
 		return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
-			new ResultPartitionWriter[] {createPartition(partitionType)},
+			new ResultPartitionWriter[] {partition},
 			taskActions,
 			jobId,
 			notifier)[0];
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index b7d7430..0cdc658 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -24,8 +24,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -81,7 +79,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected NettyShuffleEnvironment senderEnv;
 	protected NettyShuffleEnvironment receiverEnv;
-	protected IOManager ioManager;
 
 	protected int channels;
 	protected boolean broadcastMode = false;
@@ -142,8 +139,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			receiverBufferPoolSize = Math.max(2048, writers * channels * 4);
 		}
 
-		ioManager = new IOManagerAsync();
-
 		senderEnv = createShuffleEnvironment(senderBufferPoolSize, config);
 		this.dataPort = senderEnv.start();
 		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
@@ -168,7 +163,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 	public void tearDown() {
 		suppressExceptions(senderEnv::close);
 		suppressExceptions(receiverEnv::close);
-		suppressExceptions(ioManager::close);
 	}
 
 	public SerializingLongReceiver createReceiver() throws Exception {
@@ -223,7 +217,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
 			.setNumberOfSubpartitions(channels)
 			.setResultPartitionManager(environment.getResultPartitionManager())
-			.setIOManager(ioManager)
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
 			.build();