You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/09 15:13:53 UTC

[2/6] flink git commit: [FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks

[FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks

This allows us to use the output flushing interval as a parameter to evaluate,
too.

This closes #5259.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544c9703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544c9703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544c9703

Branch: refs/heads/master
Commit: 544c9703d97668b8d4a952501756db52156ff2ef
Parents: 6cfb758
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Dec 14 17:30:19 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 9 16:10:51 2018 +0100

----------------------------------------------------------------------
 .../benchmark/LongRecordWriterThread.java       |  94 -------
 .../benchmark/NetworkBenchmarkEnvironment.java  | 278 -------------------
 .../benchmark/NetworkThroughputBenchmark.java   |  90 ------
 .../NetworkThroughputBenchmarkTests.java        |  74 -----
 .../io/network/benchmark/ReceiverThread.java    |  98 -------
 .../benchmark/SerializingLongReceiver.java      |  57 ----
 .../io/benchmark/LongRecordWriterThread.java    |  94 +++++++
 .../runtime/io/benchmark/ReceiverThread.java    |  98 +++++++
 .../io/benchmark/SerializingLongReceiver.java   |  57 ++++
 .../StreamNetworkBenchmarkEnvironment.java      | 257 ++++++++++++++++-
 .../StreamNetworkPointToPointBenchmark.java     |   3 +-
 .../StreamNetworkThroughputBenchmark.java       |  90 ++++++
 .../StreamNetworkThroughputBenchmarkTests.java  |  74 +++++
 13 files changed, 662 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
deleted file mode 100644
index 6018e55..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.types.LongValue;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * Wrapping thread around {@link RecordWriter} that sends a fixed number of <tt>LongValue(0)</tt>
- * records.
- */
-public class LongRecordWriterThread extends CheckedThread {
-	private final RecordWriter<LongValue> recordWriter;
-
-	/**
-	 * Future to wait on a definition of the number of records to send.
-	 */
-	private CompletableFuture<Long> recordsToSend = new CompletableFuture<>();
-
-	private volatile boolean running = true;
-
-	public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
-		this.recordWriter = checkNotNull(recordWriter);
-	}
-
-	public void shutdown() {
-		running = false;
-		recordsToSend.complete(0L);
-	}
-
-	/**
-	 * Initializes the record writer thread with this many numbers to send.
-	 *
-	 * <p>If the thread was already started, if may now continue.
-	 *
-	 * @param records
-	 * 		number of records to send
-	 */
-	public synchronized void setRecordsToSend(long records) {
-		checkState(!recordsToSend.isDone());
-		recordsToSend.complete(records);
-	}
-
-	private synchronized CompletableFuture<Long> getRecordsToSend() {
-		return recordsToSend;
-	}
-
-	private synchronized void finishSendingRecords() {
-		recordsToSend = new CompletableFuture<>();
-	}
-
-	@Override
-	public void go() throws Exception {
-		while (running) {
-			sendRecords(getRecordsToSend().get());
-		}
-	}
-
-	private void sendRecords(long records) throws IOException, InterruptedException {
-		LongValue value = new LongValue(0);
-
-		for (int i = 1; i < records; i++) {
-			recordWriter.emit(value);
-		}
-		value.setValue(records);
-		recordWriter.broadcastEmit(value);
-		recordWriter.flush();
-
-		finishSendingRecords();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
deleted file mode 100644
index ff06187..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-
-import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
-
-/**
- * Context for network benchmarks executed by the external
- * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
- */
-public class NetworkBenchmarkEnvironment<T extends IOReadableWritable> {
-
-	private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
-
-	private static final int NUM_SLOTS_AND_THREADS = 1;
-
-	private static final InetAddress LOCAL_ADDRESS;
-
-	static {
-		try {
-			LOCAL_ADDRESS = InetAddress.getLocalHost();
-		} catch (UnknownHostException e) {
-			throw new Error(e);
-		}
-	}
-
-	protected final JobID jobId = new JobID();
-	protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
-	protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
-
-	protected NetworkEnvironment senderEnv;
-	protected NetworkEnvironment receiverEnv;
-	protected IOManager ioManager;
-
-	protected int channels;
-
-	protected ResultPartitionID[] partitionIds;
-
-	public void setUp(int writers, int channels) throws Exception {
-		this.channels = channels;
-		this.partitionIds = new ResultPartitionID[writers];
-
-		int bufferPoolSize = Math.max(2048, writers * channels * 4);
-		senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
-		receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
-		ioManager = new IOManagerAsync();
-
-		senderEnv.start();
-		receiverEnv.start();
-
-		generatePartitionIds();
-	}
-
-	public void tearDown() {
-		suppressExceptions(senderEnv::shutdown);
-		suppressExceptions(receiverEnv::shutdown);
-		suppressExceptions(ioManager::shutdown);
-	}
-
-	public SerializingLongReceiver createReceiver() throws Exception {
-		TaskManagerLocation senderLocation = new TaskManagerLocation(
-			ResourceID.generate(),
-			LOCAL_ADDRESS,
-			senderEnv.getConnectionManager().getDataPort());
-
-		InputGate receiverGate = createInputGate(
-			jobId,
-			dataSetID,
-			executionAttemptID,
-			senderLocation,
-			receiverEnv,
-			channels);
-
-		SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length);
-
-		receiver.start();
-		return receiver;
-	}
-
-	public RecordWriter<T> createRecordWriter(int partitionIndex) throws Exception {
-		ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
-		return new RecordWriter<>(sender);
-	}
-
-	private void generatePartitionIds() throws Exception {
-		for (int writer = 0; writer < partitionIds.length; writer++) {
-			partitionIds[writer] = new ResultPartitionID();
-		}
-	}
-
-	private NetworkEnvironment createNettyNetworkEnvironment(
-			@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
-
-		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
-
-		final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
-			new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
-
-		return new NetworkEnvironment(
-			bufferPool,
-			nettyConnectionManager,
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new KvStateRegistry(),
-			null,
-			null,
-			IOMode.SYNC,
-			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
-			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
-			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
-			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
-	}
-
-	protected ResultPartitionWriter createResultPartition(
-			JobID jobId,
-			ResultPartitionID partitionId,
-			NetworkEnvironment environment,
-			int channels) throws Exception {
-
-		ResultPartition resultPartition = new ResultPartition(
-			"sender task",
-			new NoOpTaskActions(),
-			jobId,
-			partitionId,
-			ResultPartitionType.PIPELINED_BOUNDED,
-			channels,
-			1,
-			environment.getResultPartitionManager(),
-			new NoOpResultPartitionConsumableNotifier(),
-			ioManager,
-			false);
-
-		// similar to NetworkEnvironment#registerTask()
-		int numBuffers = resultPartition.getNumberOfSubpartitions() *
-			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
-			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
-
-		BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
-		resultPartition.registerBufferPool(bufferPool);
-
-		environment.getResultPartitionManager().registerResultPartition(resultPartition);
-
-		return resultPartition;
-	}
-
-	private InputGate createInputGate(
-			JobID jobId,
-			IntermediateDataSetID dataSetID,
-			ExecutionAttemptID executionAttemptID,
-			final TaskManagerLocation senderLocation,
-			NetworkEnvironment environment,
-			final int channels) throws IOException {
-
-		InputGate[] gates = new InputGate[channels];
-		for (int channel = 0; channel < channels; ++channel) {
-			int finalChannel = channel;
-			InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds)
-				.map(partitionId -> new InputChannelDeploymentDescriptor(
-					partitionId,
-					ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel))))
-				.toArray(InputChannelDeploymentDescriptor[]::new);
-
-			final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(
-				dataSetID,
-				ResultPartitionType.PIPELINED_BOUNDED,
-				channel,
-				channelDescriptors);
-
-			SingleInputGate gate = SingleInputGate.create(
-				"receiving task[" + channel + "]",
-				jobId,
-				executionAttemptID,
-				gateDescriptor,
-				environment,
-				new NoOpTaskActions(),
-				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
-
-			// similar to NetworkEnvironment#registerTask()
-			int numBuffers = gate.getNumberOfInputChannels() *
-				TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
-				TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
-
-			BufferPool bufferPool =
-				environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers);
-
-			gate.setBufferPool(bufferPool);
-			gates[channel] = gate;
-		}
-
-		if (channels > 1) {
-			return new UnionInputGate(gates);
-		} else {
-			return gates[0];
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Mocks
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito
-	 * to avoid using mockito in this benchmark class.
-	 */
-	private static final class NoOpTaskActions implements TaskActions {
-
-		@Override
-		public void triggerPartitionProducerStateCheck(
-			JobID jobId,
-			IntermediateDataSetID intermediateDataSetId,
-			ResultPartitionID resultPartitionId) {}
-
-		@Override
-		public void failExternally(Throwable cause) {}
-	}
-
-	private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
-
-		@Override
-		public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
deleted file mode 100644
index 799b7c3..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.types.LongValue;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Network throughput benchmarks executed by the external
- * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
- */
-public class NetworkThroughputBenchmark {
-	private static final long RECEIVER_TIMEOUT = 30_000;
-
-	private NetworkBenchmarkEnvironment<LongValue> environment;
-	private ReceiverThread receiver;
-	private LongRecordWriterThread[] writerThreads;
-
-	/**
-	 * Executes the throughput benchmark with the given number of records.
-	 *
-	 * @param records
-	 * 		records to pass through the network stack
-	 */
-	public void executeBenchmark(long records) throws Exception {
-		final LongValue value = new LongValue();
-		value.setValue(0);
-
-		long lastRecord = records / writerThreads.length;
-		CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(lastRecord);
-
-		for (LongRecordWriterThread writerThread : writerThreads) {
-			writerThread.setRecordsToSend(lastRecord);
-		}
-
-		recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
-	}
-
-	/**
-	 * Initializes the throughput benchmark with the given parameters.
-	 *
-	 * @param recordWriters
-	 * 		number of senders, i.e.
-	 * 		{@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
-	 * @param channels
-	 * 		number of outgoing channels / receivers
-	 */
-	public void setUp(int recordWriters, int channels) throws Exception {
-		environment = new NetworkBenchmarkEnvironment<>();
-		environment.setUp(recordWriters, channels);
-		receiver = environment.createReceiver();
-		writerThreads = new LongRecordWriterThread[recordWriters];
-		for (int writer = 0; writer < recordWriters; writer++) {
-			writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer));
-			writerThreads[writer].start();
-		}
-	}
-
-	/**
-	 * Shuts down a benchmark previously set up via {@link #setUp}.
-	 *
-	 * <p>This will wait for all senders to finish but timeout with an exception after 5 seconds.
-	 */
-	public void tearDown() throws Exception {
-		for (LongRecordWriterThread writerThread : writerThreads) {
-			writerThread.shutdown();
-			writerThread.sync(5000);
-		}
-		environment.tearDown();
-		receiver.shutdown();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
deleted file mode 100644
index c84743b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.junit.Test;
-
-/**
- * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}.
- */
-public class NetworkThroughputBenchmarkTests {
-	@Test
-	public void pointToPointBenchmark() throws Exception {
-		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
-		benchmark.setUp(1, 1);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-
-	@Test
-	public void pointToMultiPointBenchmark() throws Exception {
-		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
-		benchmark.setUp(1, 100);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-
-	@Test
-	public void multiPointToPointBenchmark() throws Exception {
-		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
-		benchmark.setUp(4, 1);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-
-	@Test
-	public void multiPointToMultiPointBenchmark() throws Exception {
-		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
-		benchmark.setUp(4, 100);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
deleted file mode 100644
index be1c80f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.core.testutils.CheckedThread;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the
- * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels.
- */
-public abstract class ReceiverThread extends CheckedThread {
-	protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
-
-	protected final int expectedRepetitionsOfExpectedRecord;
-
-	protected int expectedRecordCounter;
-	protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
-	protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
-
-	protected volatile boolean running;
-
-	ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
-		setName(this.getClass().getName());
-
-		this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
-		this.running = true;
-	}
-
-	public synchronized CompletableFuture<?> setExpectedRecord(long record) {
-		checkState(!expectedRecord.isDone());
-		checkState(!recordsProcessed.isDone());
-		expectedRecord.complete(record);
-		expectedRecordCounter = 0;
-		return recordsProcessed;
-	}
-
-	private synchronized CompletableFuture<Long> getExpectedRecord() {
-		return expectedRecord;
-	}
-
-	private synchronized void finishProcessingExpectedRecords() {
-		checkState(expectedRecord.isDone());
-		checkState(!recordsProcessed.isDone());
-
-		recordsProcessed.complete(null);
-		expectedRecord = new CompletableFuture<>();
-		recordsProcessed = new CompletableFuture<>();
-	}
-
-	@Override
-	public void go() throws Exception {
-		try {
-			while (running) {
-				readRecords(getExpectedRecord().get());
-				finishProcessingExpectedRecords();
-			}
-		}
-		catch (InterruptedException e) {
-			if (running) {
-				throw e;
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
-
-	protected abstract void readRecords(long lastExpectedRecord) throws Exception;
-
-	public void shutdown() {
-		running = false;
-		interrupt();
-		expectedRecord.complete(0L);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
deleted file mode 100644
index 848c018..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.types.LongValue;
-
-/**
- * {@link ReceiverThread} that deserialize incoming messages.
- */
-public class SerializingLongReceiver extends ReceiverThread {
-
-	private final MutableRecordReader<LongValue> reader;
-
-	@SuppressWarnings("WeakerAccess")
-	public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
-		super(expectedRepetitionsOfExpectedRecord);
-		this.reader = new MutableRecordReader<>(
-			inputGate,
-			new String[]{
-				EnvironmentInformation.getTemporaryFileDirectory()
-			});
-	}
-
-	protected void readRecords(long lastExpectedRecord) throws Exception {
-		LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
-		final LongValue value = new LongValue();
-
-		while (running && reader.next(value)) {
-			final long ts = value.getValue();
-			if (ts == lastExpectedRecord) {
-				expectedRecordCounter++;
-				if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
-					break;
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
new file mode 100644
index 0000000..e6cc2d5
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.types.LongValue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wrapping thread around {@link RecordWriter} that sends a fixed number of <tt>LongValue(0)</tt>
+ * records.
+ */
+public class LongRecordWriterThread extends CheckedThread {
+	private final RecordWriter<LongValue> recordWriter;
+
+	/**
+	 * Future to wait on a definition of the number of records to send.
+	 */
+	private CompletableFuture<Long> recordsToSend = new CompletableFuture<>();
+
+	private volatile boolean running = true;
+
+	public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
+		this.recordWriter = checkNotNull(recordWriter);
+	}
+
+	public void shutdown() {
+		running = false;
+		recordsToSend.complete(0L);
+	}
+
+	/**
+	 * Initializes the record writer thread with this many numbers to send.
+	 *
+	 * <p>If the thread was already started, if may now continue.
+	 *
+	 * @param records
+	 * 		number of records to send
+	 */
+	public synchronized void setRecordsToSend(long records) {
+		checkState(!recordsToSend.isDone());
+		recordsToSend.complete(records);
+	}
+
+	private synchronized CompletableFuture<Long> getRecordsToSend() {
+		return recordsToSend;
+	}
+
+	private synchronized void finishSendingRecords() {
+		recordsToSend = new CompletableFuture<>();
+	}
+
+	@Override
+	public void go() throws Exception {
+		while (running) {
+			sendRecords(getRecordsToSend().get());
+		}
+	}
+
+	private void sendRecords(long records) throws IOException, InterruptedException {
+		LongValue value = new LongValue(0);
+
+		for (int i = 1; i < records; i++) {
+			recordWriter.emit(value);
+		}
+		value.setValue(records);
+		recordWriter.broadcastEmit(value);
+		recordWriter.flush();
+
+		finishSendingRecords();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
new file mode 100644
index 0000000..126efef
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the
+ * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels.
+ */
+public abstract class ReceiverThread extends CheckedThread {
+	protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
+
+	protected final int expectedRepetitionsOfExpectedRecord;
+
+	protected int expectedRecordCounter;
+	protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
+	protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
+
+	protected volatile boolean running;
+
+	ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
+		setName(this.getClass().getName());
+
+		this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
+		this.running = true;
+	}
+
+	public synchronized CompletableFuture<?> setExpectedRecord(long record) {
+		checkState(!expectedRecord.isDone());
+		checkState(!recordsProcessed.isDone());
+		expectedRecord.complete(record);
+		expectedRecordCounter = 0;
+		return recordsProcessed;
+	}
+
+	private synchronized CompletableFuture<Long> getExpectedRecord() {
+		return expectedRecord;
+	}
+
+	private synchronized void finishProcessingExpectedRecords() {
+		checkState(expectedRecord.isDone());
+		checkState(!recordsProcessed.isDone());
+
+		recordsProcessed.complete(null);
+		expectedRecord = new CompletableFuture<>();
+		recordsProcessed = new CompletableFuture<>();
+	}
+
+	@Override
+	public void go() throws Exception {
+		try {
+			while (running) {
+				readRecords(getExpectedRecord().get());
+				finishProcessingExpectedRecords();
+			}
+		}
+		catch (InterruptedException e) {
+			if (running) {
+				throw e;
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	protected abstract void readRecords(long lastExpectedRecord) throws Exception;
+
+	public void shutdown() {
+		running = false;
+		interrupt();
+		expectedRecord.complete(0L);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
new file mode 100644
index 0000000..580612c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+	private final MutableRecordReader<LongValue> reader;
+
+	@SuppressWarnings("WeakerAccess")
+	public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
+		super(expectedRepetitionsOfExpectedRecord);
+		this.reader = new MutableRecordReader<>(
+			inputGate,
+			new String[]{
+				EnvironmentInformation.getTemporaryFileDirectory()
+			});
+	}
+
+	protected void readRecords(long lastExpectedRecord) throws Exception {
+		LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
+		final LongValue value = new LongValue();
+
+		while (running && reader.next(value)) {
+			final long ts = value.getValue();
+			if (ts == lastExpectedRecord) {
+				expectedRecordCounter++;
+				if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
+					break;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index acbbdf8..83508ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -18,23 +18,262 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
+
 /**
- * Context for stream network benchmarks executed by the external
+ * Context for network benchmarks executed by the external
  * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
  */
-public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> extends NetworkBenchmarkEnvironment<T> {
+public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
+
+	private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+
+	private static final int NUM_SLOTS_AND_THREADS = 1;
+
+	private static final InetAddress LOCAL_ADDRESS;
+
+	static {
+		try {
+			LOCAL_ADDRESS = InetAddress.getLocalHost();
+		} catch (UnknownHostException e) {
+			throw new Error(e);
+		}
+	}
+
+	protected final JobID jobId = new JobID();
+	protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
+	protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+
+	protected NetworkEnvironment senderEnv;
+	protected NetworkEnvironment receiverEnv;
+	protected IOManager ioManager;
+
+	protected int channels;
+
+	protected ResultPartitionID[] partitionIds;
+
+	public void setUp(int writers, int channels) throws Exception {
+		this.channels = channels;
+		this.partitionIds = new ResultPartitionID[writers];
+
+		int bufferPoolSize = Math.max(2048, writers * channels * 4);
+		senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+		receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+		ioManager = new IOManagerAsync();
+
+		senderEnv.start();
+		receiverEnv.start();
+
+		generatePartitionIds();
+	}
+
+	public void tearDown() {
+		suppressExceptions(senderEnv::shutdown);
+		suppressExceptions(receiverEnv::shutdown);
+		suppressExceptions(ioManager::shutdown);
+	}
+
+	public SerializingLongReceiver createReceiver() throws Exception {
+		TaskManagerLocation senderLocation = new TaskManagerLocation(
+			ResourceID.generate(),
+			LOCAL_ADDRESS,
+			senderEnv.getConnectionManager().getDataPort());
+
+		InputGate receiverGate = createInputGate(
+			jobId,
+			dataSetID,
+			executionAttemptID,
+			senderLocation,
+			receiverEnv,
+			channels);
+
+		SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length);
+
+		receiver.start();
+		return receiver;
+	}
+
+	public StreamRecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
+		ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
+		return new StreamRecordWriter<>(sender,  new RoundRobinChannelSelector<T>(), flushTimeout);
+	}
+
+	private void generatePartitionIds() throws Exception {
+		for (int writer = 0; writer < partitionIds.length; writer++) {
+			partitionIds[writer] = new ResultPartitionID();
+		}
+	}
+
+	private NetworkEnvironment createNettyNetworkEnvironment(
+			@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+
+		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+
+		final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
+			new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+
+		return new NetworkEnvironment(
+			bufferPool,
+			nettyConnectionManager,
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			null,
+			IOMode.SYNC,
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
+	}
+
+	protected ResultPartitionWriter createResultPartition(
+			JobID jobId,
+			ResultPartitionID partitionId,
+			NetworkEnvironment environment,
+			int channels) throws Exception {
+
+		ResultPartition resultPartition = new ResultPartition(
+			"sender task",
+			new NoOpTaskActions(),
+			jobId,
+			partitionId,
+			ResultPartitionType.PIPELINED_BOUNDED,
+			channels,
+			1,
+			environment.getResultPartitionManager(),
+			new NoOpResultPartitionConsumableNotifier(),
+			ioManager,
+			false);
+
+		// similar to NetworkEnvironment#registerTask()
+		int numBuffers = resultPartition.getNumberOfSubpartitions() *
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+		BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
+		resultPartition.registerBufferPool(bufferPool);
+
+		environment.getResultPartitionManager().registerResultPartition(resultPartition);
+
+		return resultPartition;
+	}
+
+	private InputGate createInputGate(
+			JobID jobId,
+			IntermediateDataSetID dataSetID,
+			ExecutionAttemptID executionAttemptID,
+			final TaskManagerLocation senderLocation,
+			NetworkEnvironment environment,
+			final int channels) throws IOException {
+
+		InputGate[] gates = new InputGate[channels];
+		for (int channel = 0; channel < channels; ++channel) {
+			int finalChannel = channel;
+			InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds)
+				.map(partitionId -> new InputChannelDeploymentDescriptor(
+					partitionId,
+					ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel))))
+				.toArray(InputChannelDeploymentDescriptor[]::new);
+
+			final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(
+				dataSetID,
+				ResultPartitionType.PIPELINED_BOUNDED,
+				channel,
+				channelDescriptors);
+
+			SingleInputGate gate = SingleInputGate.create(
+				"receiving task[" + channel + "]",
+				jobId,
+				executionAttemptID,
+				gateDescriptor,
+				environment,
+				new NoOpTaskActions(),
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+
+			// similar to NetworkEnvironment#registerTask()
+			int numBuffers = gate.getNumberOfInputChannels() *
+				TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+				TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+			BufferPool bufferPool =
+				environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers);
+
+			gate.setBufferPool(bufferPool);
+			gates[channel] = gate;
+		}
+
+		if (channels > 1) {
+			return new UnionInputGate(gates);
+		} else {
+			return gates[0];
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mocks
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito
+	 * to avoid using mockito in this benchmark class.
+	 */
+	private static final class NoOpTaskActions implements TaskActions {
+
+		@Override
+		public void triggerPartitionProducerStateCheck(
+			JobID jobId,
+			IntermediateDataSetID intermediateDataSetId,
+			ResultPartitionID resultPartitionId) {}
+
+		@Override
+		public void failExternally(Throwable cause) {}
+	}
+
+	private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
 
-	public RecordWriter<T> createStreamRecordWriter(int partitionIndex, long flushTimeout)
-			throws Exception {
-		ResultPartitionWriter sender =
-			createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
-		return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<>(), flushTimeout);
+		@Override
+		public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 9286485..843d3e2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.io.benchmark;
 
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.benchmark.ReceiverThread;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -74,7 +73,7 @@ public class StreamNetworkPointToPointBenchmark {
 		environment.setUp(1, 1);
 
 		receiver = environment.createReceiver();
-		recordWriter = environment.createStreamRecordWriter(0, flushTimeout);
+		recordWriter = environment.createRecordWriter(0, flushTimeout);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
new file mode 100644
index 0000000..3f41b00
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.types.LongValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Network throughput benchmarks executed by the external
+ * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
+ */
+public class StreamNetworkThroughputBenchmark {
+	private static final long RECEIVER_TIMEOUT = 30_000;
+
+	private StreamNetworkBenchmarkEnvironment<LongValue> environment;
+	private ReceiverThread receiver;
+	private LongRecordWriterThread[] writerThreads;
+
+	/**
+	 * Executes the throughput benchmark with the given number of records.
+	 *
+	 * @param records
+	 * 		records to pass through the network stack
+	 */
+	public void executeBenchmark(long records) throws Exception {
+		final LongValue value = new LongValue();
+		value.setValue(0);
+
+		long lastRecord = records / writerThreads.length;
+		CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(lastRecord);
+
+		for (LongRecordWriterThread writerThread : writerThreads) {
+			writerThread.setRecordsToSend(lastRecord);
+		}
+
+		recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+	}
+
+	/**
+	 * Initializes the throughput benchmark with the given parameters.
+	 *
+	 * @param recordWriters
+	 * 		number of senders, i.e.
+	 * 		{@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
+	 * @param channels
+	 * 		number of outgoing channels / receivers
+	 */
+	public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception {
+		environment = new StreamNetworkBenchmarkEnvironment<>();
+		environment.setUp(recordWriters, channels);
+		receiver = environment.createReceiver();
+		writerThreads = new LongRecordWriterThread[recordWriters];
+		for (int writer = 0; writer < recordWriters; writer++) {
+			writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout));
+			writerThreads[writer].start();
+		}
+	}
+
+	/**
+	 * Shuts down a benchmark previously set up via {@link #setUp}.
+	 *
+	 * <p>This will wait for all senders to finish but timeout with an exception after 5 seconds.
+	 */
+	public void tearDown() throws Exception {
+		for (LongRecordWriterThread writerThread : writerThreads) {
+			writerThread.shutdown();
+			writerThread.sync(5000);
+		}
+		environment.tearDown();
+		receiver.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
new file mode 100644
index 0000000..8af8148
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
+ */
+public class StreamNetworkThroughputBenchmarkTests {
+	@Test
+	public void pointToPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(1, 1, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void pointToMultiPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(1, 100, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void multiPointToPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(4, 1, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void multiPointToMultiPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(4, 100, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+}