You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/08 13:04:08 UTC

[14/15] flink git commit: [FLINK-8220][network-benchmarks] Define network benchmarks in Flink project

[FLINK-8220][network-benchmarks] Define network benchmarks in Flink project


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8161911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8161911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8161911

Branch: refs/heads/master
Commit: c816191113d813156467f3e33856636ef0bcce38
Parents: 81d3e72
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Dec 7 10:03:32 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jan 8 11:46:01 2018 +0100

----------------------------------------------------------------------
 .../benchmark/LongRecordWriterThread.java       |  94 +++++++
 .../benchmark/NetworkBenchmarkEnvironment.java  | 278 +++++++++++++++++++
 .../benchmark/NetworkThroughputBenchmark.java   |  90 ++++++
 .../NetworkThroughputBenchmarkTests.java        |  74 +++++
 .../io/network/benchmark/ReceiverThread.java    |  98 +++++++
 .../benchmark/SerializingLongReceiver.java      |  57 ++++
 6 files changed, 691 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
new file mode 100644
index 0000000..6018e55
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.types.LongValue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wrapping thread around {@link RecordWriter} that sends a fixed number of <tt>LongValue(0)</tt>
+ * records.
+ */
+public class LongRecordWriterThread extends CheckedThread {
+	private final RecordWriter<LongValue> recordWriter;
+
+	/**
+	 * Future to wait on a definition of the number of records to send.
+	 */
+	private CompletableFuture<Long> recordsToSend = new CompletableFuture<>();
+
+	private volatile boolean running = true;
+
+	public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
+		this.recordWriter = checkNotNull(recordWriter);
+	}
+
+	public void shutdown() {
+		running = false;
+		recordsToSend.complete(0L);
+	}
+
+	/**
+	 * Initializes the record writer thread with this many numbers to send.
+	 *
+	 * <p>If the thread was already started, if may now continue.
+	 *
+	 * @param records
+	 * 		number of records to send
+	 */
+	public synchronized void setRecordsToSend(long records) {
+		checkState(!recordsToSend.isDone());
+		recordsToSend.complete(records);
+	}
+
+	private synchronized CompletableFuture<Long> getRecordsToSend() {
+		return recordsToSend;
+	}
+
+	private synchronized void finishSendingRecords() {
+		recordsToSend = new CompletableFuture<>();
+	}
+
+	@Override
+	public void go() throws Exception {
+		while (running) {
+			sendRecords(getRecordsToSend().get());
+		}
+	}
+
+	private void sendRecords(long records) throws IOException, InterruptedException {
+		LongValue value = new LongValue(0);
+
+		for (int i = 1; i < records; i++) {
+			recordWriter.emit(value);
+		}
+		value.setValue(records);
+		recordWriter.broadcastEmit(value);
+		recordWriter.flush();
+
+		finishSendingRecords();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
new file mode 100644
index 0000000..ff06187
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
+
+/**
+ * Context for network benchmarks executed by the external
+ * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
+ */
+public class NetworkBenchmarkEnvironment<T extends IOReadableWritable> {
+
+	private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+
+	private static final int NUM_SLOTS_AND_THREADS = 1;
+
+	private static final InetAddress LOCAL_ADDRESS;
+
+	static {
+		try {
+			LOCAL_ADDRESS = InetAddress.getLocalHost();
+		} catch (UnknownHostException e) {
+			throw new Error(e);
+		}
+	}
+
+	protected final JobID jobId = new JobID();
+	protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
+	protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+
+	protected NetworkEnvironment senderEnv;
+	protected NetworkEnvironment receiverEnv;
+	protected IOManager ioManager;
+
+	protected int channels;
+
+	protected ResultPartitionID[] partitionIds;
+
+	public void setUp(int writers, int channels) throws Exception {
+		this.channels = channels;
+		this.partitionIds = new ResultPartitionID[writers];
+
+		int bufferPoolSize = Math.max(2048, writers * channels * 4);
+		senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+		receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+		ioManager = new IOManagerAsync();
+
+		senderEnv.start();
+		receiverEnv.start();
+
+		generatePartitionIds();
+	}
+
+	public void tearDown() {
+		suppressExceptions(senderEnv::shutdown);
+		suppressExceptions(receiverEnv::shutdown);
+		suppressExceptions(ioManager::shutdown);
+	}
+
+	public SerializingLongReceiver createReceiver() throws Exception {
+		TaskManagerLocation senderLocation = new TaskManagerLocation(
+			ResourceID.generate(),
+			LOCAL_ADDRESS,
+			senderEnv.getConnectionManager().getDataPort());
+
+		InputGate receiverGate = createInputGate(
+			jobId,
+			dataSetID,
+			executionAttemptID,
+			senderLocation,
+			receiverEnv,
+			channels);
+
+		SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length);
+
+		receiver.start();
+		return receiver;
+	}
+
+	public RecordWriter<T> createRecordWriter(int partitionIndex) throws Exception {
+		ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
+		return new RecordWriter<>(sender);
+	}
+
+	private void generatePartitionIds() throws Exception {
+		for (int writer = 0; writer < partitionIds.length; writer++) {
+			partitionIds[writer] = new ResultPartitionID();
+		}
+	}
+
+	private NetworkEnvironment createNettyNetworkEnvironment(
+			@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+
+		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+
+		final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
+			new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+
+		return new NetworkEnvironment(
+			bufferPool,
+			nettyConnectionManager,
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			null,
+			IOMode.SYNC,
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
+	}
+
+	protected ResultPartitionWriter createResultPartition(
+			JobID jobId,
+			ResultPartitionID partitionId,
+			NetworkEnvironment environment,
+			int channels) throws Exception {
+
+		ResultPartition resultPartition = new ResultPartition(
+			"sender task",
+			new NoOpTaskActions(),
+			jobId,
+			partitionId,
+			ResultPartitionType.PIPELINED_BOUNDED,
+			channels,
+			1,
+			environment.getResultPartitionManager(),
+			new NoOpResultPartitionConsumableNotifier(),
+			ioManager,
+			false);
+
+		// similar to NetworkEnvironment#registerTask()
+		int numBuffers = resultPartition.getNumberOfSubpartitions() *
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+		BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
+		resultPartition.registerBufferPool(bufferPool);
+
+		environment.getResultPartitionManager().registerResultPartition(resultPartition);
+
+		return resultPartition;
+	}
+
+	private InputGate createInputGate(
+			JobID jobId,
+			IntermediateDataSetID dataSetID,
+			ExecutionAttemptID executionAttemptID,
+			final TaskManagerLocation senderLocation,
+			NetworkEnvironment environment,
+			final int channels) throws IOException {
+
+		InputGate[] gates = new InputGate[channels];
+		for (int channel = 0; channel < channels; ++channel) {
+			int finalChannel = channel;
+			InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds)
+				.map(partitionId -> new InputChannelDeploymentDescriptor(
+					partitionId,
+					ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel))))
+				.toArray(InputChannelDeploymentDescriptor[]::new);
+
+			final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(
+				dataSetID,
+				ResultPartitionType.PIPELINED_BOUNDED,
+				channel,
+				channelDescriptors);
+
+			SingleInputGate gate = SingleInputGate.create(
+				"receiving task[" + channel + "]",
+				jobId,
+				executionAttemptID,
+				gateDescriptor,
+				environment,
+				new NoOpTaskActions(),
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+
+			// similar to NetworkEnvironment#registerTask()
+			int numBuffers = gate.getNumberOfInputChannels() *
+				TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+				TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+			BufferPool bufferPool =
+				environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers);
+
+			gate.setBufferPool(bufferPool);
+			gates[channel] = gate;
+		}
+
+		if (channels > 1) {
+			return new UnionInputGate(gates);
+		} else {
+			return gates[0];
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mocks
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito
+	 * to avoid using mockito in this benchmark class.
+	 */
+	private static final class NoOpTaskActions implements TaskActions {
+
+		@Override
+		public void triggerPartitionProducerStateCheck(
+			JobID jobId,
+			IntermediateDataSetID intermediateDataSetId,
+			ResultPartitionID resultPartitionId) {}
+
+		@Override
+		public void failExternally(Throwable cause) {}
+	}
+
+	private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+		@Override
+		public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
new file mode 100644
index 0000000..799b7c3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.types.LongValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Network throughput benchmarks executed by the external
+ * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
+ */
+public class NetworkThroughputBenchmark {
+	private static final long RECEIVER_TIMEOUT = 30_000;
+
+	private NetworkBenchmarkEnvironment<LongValue> environment;
+	private ReceiverThread receiver;
+	private LongRecordWriterThread[] writerThreads;
+
+	/**
+	 * Executes the throughput benchmark with the given number of records.
+	 *
+	 * @param records
+	 * 		records to pass through the network stack
+	 */
+	public void executeBenchmark(long records) throws Exception {
+		final LongValue value = new LongValue();
+		value.setValue(0);
+
+		long lastRecord = records / writerThreads.length;
+		CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(lastRecord);
+
+		for (LongRecordWriterThread writerThread : writerThreads) {
+			writerThread.setRecordsToSend(lastRecord);
+		}
+
+		recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+	}
+
+	/**
+	 * Initializes the throughput benchmark with the given parameters.
+	 *
+	 * @param recordWriters
+	 * 		number of senders, i.e.
+	 * 		{@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
+	 * @param channels
+	 * 		number of outgoing channels / receivers
+	 */
+	public void setUp(int recordWriters, int channels) throws Exception {
+		environment = new NetworkBenchmarkEnvironment<>();
+		environment.setUp(recordWriters, channels);
+		receiver = environment.createReceiver();
+		writerThreads = new LongRecordWriterThread[recordWriters];
+		for (int writer = 0; writer < recordWriters; writer++) {
+			writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer));
+			writerThreads[writer].start();
+		}
+	}
+
+	/**
+	 * Shuts down a benchmark previously set up via {@link #setUp}.
+	 *
+	 * <p>This will wait for all senders to finish but timeout with an exception after 5 seconds.
+	 */
+	public void tearDown() throws Exception {
+		for (LongRecordWriterThread writerThread : writerThreads) {
+			writerThread.shutdown();
+			writerThread.sync(5000);
+		}
+		environment.tearDown();
+		receiver.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
new file mode 100644
index 0000000..c84743b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}.
+ */
+public class NetworkThroughputBenchmarkTests {
+	@Test
+	public void pointToPointBenchmark() throws Exception {
+		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+		benchmark.setUp(1, 1);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void pointToMultiPointBenchmark() throws Exception {
+		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+		benchmark.setUp(1, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void multiPointToPointBenchmark() throws Exception {
+		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+		benchmark.setUp(4, 1);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void multiPointToMultiPointBenchmark() throws Exception {
+		NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+		benchmark.setUp(4, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
new file mode 100644
index 0000000..be1c80f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the
+ * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels.
+ */
+public abstract class ReceiverThread extends CheckedThread {
+	protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
+
+	protected final int expectedRepetitionsOfExpectedRecord;
+
+	protected int expectedRecordCounter;
+	protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
+	protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
+
+	protected volatile boolean running;
+
+	ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
+		setName(this.getClass().getName());
+
+		this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
+		this.running = true;
+	}
+
+	public synchronized CompletableFuture<?> setExpectedRecord(long record) {
+		checkState(!expectedRecord.isDone());
+		checkState(!recordsProcessed.isDone());
+		expectedRecord.complete(record);
+		expectedRecordCounter = 0;
+		return recordsProcessed;
+	}
+
+	private synchronized CompletableFuture<Long> getExpectedRecord() {
+		return expectedRecord;
+	}
+
+	private synchronized void finishProcessingExpectedRecords() {
+		checkState(expectedRecord.isDone());
+		checkState(!recordsProcessed.isDone());
+
+		recordsProcessed.complete(null);
+		expectedRecord = new CompletableFuture<>();
+		recordsProcessed = new CompletableFuture<>();
+	}
+
+	@Override
+	public void go() throws Exception {
+		try {
+			while (running) {
+				readRecords(getExpectedRecord().get());
+				finishProcessingExpectedRecords();
+			}
+		}
+		catch (InterruptedException e) {
+			if (running) {
+				throw e;
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	protected abstract void readRecords(long lastExpectedRecord) throws Exception;
+
+	public void shutdown() {
+		running = false;
+		interrupt();
+		expectedRecord.complete(0L);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
new file mode 100644
index 0000000..848c018
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+	private final MutableRecordReader<LongValue> reader;
+
+	@SuppressWarnings("WeakerAccess")
+	public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
+		super(expectedRepetitionsOfExpectedRecord);
+		this.reader = new MutableRecordReader<>(
+			inputGate,
+			new String[]{
+				EnvironmentInformation.getTemporaryFileDirectory()
+			});
+	}
+
+	protected void readRecords(long lastExpectedRecord) throws Exception {
+		LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
+		final LongValue value = new LongValue();
+
+		while (running && reader.next(value)) {
+			final long ts = value.getValue();
+			if (ts == lastExpectedRecord) {
+				expectedRecordCounter++;
+				if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
+					break;
+				}
+			}
+		}
+	}
+}