You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/08 13:04:08 UTC
[14/15] flink git commit: [FLINK-8220][network-benchmarks] Define
network benchmarks in Flink project
[FLINK-8220][network-benchmarks] Define network benchmarks in Flink project
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8161911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8161911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8161911
Branch: refs/heads/master
Commit: c816191113d813156467f3e33856636ef0bcce38
Parents: 81d3e72
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Dec 7 10:03:32 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jan 8 11:46:01 2018 +0100
----------------------------------------------------------------------
.../benchmark/LongRecordWriterThread.java | 94 +++++++
.../benchmark/NetworkBenchmarkEnvironment.java | 278 +++++++++++++++++++
.../benchmark/NetworkThroughputBenchmark.java | 90 ++++++
.../NetworkThroughputBenchmarkTests.java | 74 +++++
.../io/network/benchmark/ReceiverThread.java | 98 +++++++
.../benchmark/SerializingLongReceiver.java | 57 ++++
6 files changed, 691 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
new file mode 100644
index 0000000..6018e55
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.types.LongValue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wrapping thread around {@link RecordWriter} that sends a fixed number of <tt>LongValue(0)</tt>
+ * records.
+ */
+public class LongRecordWriterThread extends CheckedThread {
+ private final RecordWriter<LongValue> recordWriter;
+
+ /**
+ * Future to wait on a definition of the number of records to send.
+ */
+ private CompletableFuture<Long> recordsToSend = new CompletableFuture<>();
+
+ private volatile boolean running = true;
+
+ public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
+ this.recordWriter = checkNotNull(recordWriter);
+ }
+
+ public void shutdown() {
+ running = false;
+ recordsToSend.complete(0L);
+ }
+
+ /**
+ * Initializes the record writer thread with this many numbers to send.
+ *
+ * <p>If the thread was already started, if may now continue.
+ *
+ * @param records
+ * number of records to send
+ */
+ public synchronized void setRecordsToSend(long records) {
+ checkState(!recordsToSend.isDone());
+ recordsToSend.complete(records);
+ }
+
+ private synchronized CompletableFuture<Long> getRecordsToSend() {
+ return recordsToSend;
+ }
+
+ private synchronized void finishSendingRecords() {
+ recordsToSend = new CompletableFuture<>();
+ }
+
+ @Override
+ public void go() throws Exception {
+ while (running) {
+ sendRecords(getRecordsToSend().get());
+ }
+ }
+
+ private void sendRecords(long records) throws IOException, InterruptedException {
+ LongValue value = new LongValue(0);
+
+ for (int i = 1; i < records; i++) {
+ recordWriter.emit(value);
+ }
+ value.setValue(records);
+ recordWriter.broadcastEmit(value);
+ recordWriter.flush();
+
+ finishSendingRecords();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
new file mode 100644
index 0000000..ff06187
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
+
+/**
+ * Context for network benchmarks executed by the external
+ * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
+ */
+public class NetworkBenchmarkEnvironment<T extends IOReadableWritable> {
+
+ private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+
+ private static final int NUM_SLOTS_AND_THREADS = 1;
+
+ private static final InetAddress LOCAL_ADDRESS;
+
+ static {
+ try {
+ LOCAL_ADDRESS = InetAddress.getLocalHost();
+ } catch (UnknownHostException e) {
+ throw new Error(e);
+ }
+ }
+
+ protected final JobID jobId = new JobID();
+ protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
+ protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+
+ protected NetworkEnvironment senderEnv;
+ protected NetworkEnvironment receiverEnv;
+ protected IOManager ioManager;
+
+ protected int channels;
+
+ protected ResultPartitionID[] partitionIds;
+
+ public void setUp(int writers, int channels) throws Exception {
+ this.channels = channels;
+ this.partitionIds = new ResultPartitionID[writers];
+
+ int bufferPoolSize = Math.max(2048, writers * channels * 4);
+ senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+ receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+ ioManager = new IOManagerAsync();
+
+ senderEnv.start();
+ receiverEnv.start();
+
+ generatePartitionIds();
+ }
+
+ public void tearDown() {
+ suppressExceptions(senderEnv::shutdown);
+ suppressExceptions(receiverEnv::shutdown);
+ suppressExceptions(ioManager::shutdown);
+ }
+
+ public SerializingLongReceiver createReceiver() throws Exception {
+ TaskManagerLocation senderLocation = new TaskManagerLocation(
+ ResourceID.generate(),
+ LOCAL_ADDRESS,
+ senderEnv.getConnectionManager().getDataPort());
+
+ InputGate receiverGate = createInputGate(
+ jobId,
+ dataSetID,
+ executionAttemptID,
+ senderLocation,
+ receiverEnv,
+ channels);
+
+ SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length);
+
+ receiver.start();
+ return receiver;
+ }
+
+ public RecordWriter<T> createRecordWriter(int partitionIndex) throws Exception {
+ ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
+ return new RecordWriter<>(sender);
+ }
+
+ private void generatePartitionIds() throws Exception {
+ for (int writer = 0; writer < partitionIds.length; writer++) {
+ partitionIds[writer] = new ResultPartitionID();
+ }
+ }
+
+ private NetworkEnvironment createNettyNetworkEnvironment(
+ @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+
+ final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+
+ final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
+ new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+
+ return new NetworkEnvironment(
+ bufferPool,
+ nettyConnectionManager,
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new KvStateRegistry(),
+ null,
+ null,
+ IOMode.SYNC,
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+ TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
+ }
+
+ protected ResultPartitionWriter createResultPartition(
+ JobID jobId,
+ ResultPartitionID partitionId,
+ NetworkEnvironment environment,
+ int channels) throws Exception {
+
+ ResultPartition resultPartition = new ResultPartition(
+ "sender task",
+ new NoOpTaskActions(),
+ jobId,
+ partitionId,
+ ResultPartitionType.PIPELINED_BOUNDED,
+ channels,
+ 1,
+ environment.getResultPartitionManager(),
+ new NoOpResultPartitionConsumableNotifier(),
+ ioManager,
+ false);
+
+ // similar to NetworkEnvironment#registerTask()
+ int numBuffers = resultPartition.getNumberOfSubpartitions() *
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+ TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+ BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
+ resultPartition.registerBufferPool(bufferPool);
+
+ environment.getResultPartitionManager().registerResultPartition(resultPartition);
+
+ return resultPartition;
+ }
+
+ private InputGate createInputGate(
+ JobID jobId,
+ IntermediateDataSetID dataSetID,
+ ExecutionAttemptID executionAttemptID,
+ final TaskManagerLocation senderLocation,
+ NetworkEnvironment environment,
+ final int channels) throws IOException {
+
+ InputGate[] gates = new InputGate[channels];
+ for (int channel = 0; channel < channels; ++channel) {
+ int finalChannel = channel;
+ InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds)
+ .map(partitionId -> new InputChannelDeploymentDescriptor(
+ partitionId,
+ ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel))))
+ .toArray(InputChannelDeploymentDescriptor[]::new);
+
+ final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(
+ dataSetID,
+ ResultPartitionType.PIPELINED_BOUNDED,
+ channel,
+ channelDescriptors);
+
+ SingleInputGate gate = SingleInputGate.create(
+ "receiving task[" + channel + "]",
+ jobId,
+ executionAttemptID,
+ gateDescriptor,
+ environment,
+ new NoOpTaskActions(),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+
+ // similar to NetworkEnvironment#registerTask()
+ int numBuffers = gate.getNumberOfInputChannels() *
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+ TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+ BufferPool bufferPool =
+ environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers);
+
+ gate.setBufferPool(bufferPool);
+ gates[channel] = gate;
+ }
+
+ if (channels > 1) {
+ return new UnionInputGate(gates);
+ } else {
+ return gates[0];
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Mocks
+ // ------------------------------------------------------------------------
+
+ /**
+ * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito
+ * to avoid using mockito in this benchmark class.
+ */
+ private static final class NoOpTaskActions implements TaskActions {
+
+ @Override
+ public void triggerPartitionProducerStateCheck(
+ JobID jobId,
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId) {}
+
+ @Override
+ public void failExternally(Throwable cause) {}
+ }
+
+ private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+ @Override
+ public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
new file mode 100644
index 0000000..799b7c3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.types.LongValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Network throughput benchmarks executed by the external
+ * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
+ */
+public class NetworkThroughputBenchmark {
+ private static final long RECEIVER_TIMEOUT = 30_000;
+
+ private NetworkBenchmarkEnvironment<LongValue> environment;
+ private ReceiverThread receiver;
+ private LongRecordWriterThread[] writerThreads;
+
+ /**
+ * Executes the throughput benchmark with the given number of records.
+ *
+ * @param records
+ * records to pass through the network stack
+ */
+ public void executeBenchmark(long records) throws Exception {
+ final LongValue value = new LongValue();
+ value.setValue(0);
+
+ long lastRecord = records / writerThreads.length;
+ CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(lastRecord);
+
+ for (LongRecordWriterThread writerThread : writerThreads) {
+ writerThread.setRecordsToSend(lastRecord);
+ }
+
+ recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Initializes the throughput benchmark with the given parameters.
+ *
+ * @param recordWriters
+ * number of senders, i.e.
+ * {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
+ * @param channels
+ * number of outgoing channels / receivers
+ */
+ public void setUp(int recordWriters, int channels) throws Exception {
+ environment = new NetworkBenchmarkEnvironment<>();
+ environment.setUp(recordWriters, channels);
+ receiver = environment.createReceiver();
+ writerThreads = new LongRecordWriterThread[recordWriters];
+ for (int writer = 0; writer < recordWriters; writer++) {
+ writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer));
+ writerThreads[writer].start();
+ }
+ }
+
+ /**
+ * Shuts down a benchmark previously set up via {@link #setUp}.
+ *
+ * <p>This will wait for all senders to finish but timeout with an exception after 5 seconds.
+ */
+ public void tearDown() throws Exception {
+ for (LongRecordWriterThread writerThread : writerThreads) {
+ writerThread.shutdown();
+ writerThread.sync(5000);
+ }
+ environment.tearDown();
+ receiver.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
new file mode 100644
index 0000000..c84743b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}.
+ */
+public class NetworkThroughputBenchmarkTests {
+ @Test
+ public void pointToPointBenchmark() throws Exception {
+ NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+ benchmark.setUp(1, 1);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void pointToMultiPointBenchmark() throws Exception {
+ NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+ benchmark.setUp(1, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void multiPointToPointBenchmark() throws Exception {
+ NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+ benchmark.setUp(4, 1);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void multiPointToMultiPointBenchmark() throws Exception {
+ NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
+ benchmark.setUp(4, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
new file mode 100644
index 0000000..be1c80f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the
+ * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels.
+ */
+public abstract class ReceiverThread extends CheckedThread {
+ protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
+
+ protected final int expectedRepetitionsOfExpectedRecord;
+
+ protected int expectedRecordCounter;
+ protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
+ protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
+
+ protected volatile boolean running;
+
+ ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
+ setName(this.getClass().getName());
+
+ this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
+ this.running = true;
+ }
+
+ public synchronized CompletableFuture<?> setExpectedRecord(long record) {
+ checkState(!expectedRecord.isDone());
+ checkState(!recordsProcessed.isDone());
+ expectedRecord.complete(record);
+ expectedRecordCounter = 0;
+ return recordsProcessed;
+ }
+
+ private synchronized CompletableFuture<Long> getExpectedRecord() {
+ return expectedRecord;
+ }
+
+ private synchronized void finishProcessingExpectedRecords() {
+ checkState(expectedRecord.isDone());
+ checkState(!recordsProcessed.isDone());
+
+ recordsProcessed.complete(null);
+ expectedRecord = new CompletableFuture<>();
+ recordsProcessed = new CompletableFuture<>();
+ }
+
+ @Override
+ public void go() throws Exception {
+ try {
+ while (running) {
+ readRecords(getExpectedRecord().get());
+ finishProcessingExpectedRecords();
+ }
+ }
+ catch (InterruptedException e) {
+ if (running) {
+ throw e;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected abstract void readRecords(long lastExpectedRecord) throws Exception;
+
+ public void shutdown() {
+ running = false;
+ interrupt();
+ expectedRecord.complete(0L);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
new file mode 100644
index 0000000..848c018
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+ private final MutableRecordReader<LongValue> reader;
+
+ @SuppressWarnings("WeakerAccess")
+ public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
+ super(expectedRepetitionsOfExpectedRecord);
+ this.reader = new MutableRecordReader<>(
+ inputGate,
+ new String[]{
+ EnvironmentInformation.getTemporaryFileDirectory()
+ });
+ }
+
+ protected void readRecords(long lastExpectedRecord) throws Exception {
+ LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
+ final LongValue value = new LongValue();
+
+ while (running && reader.next(value)) {
+ final long ts = value.getValue();
+ if (ts == lastExpectedRecord) {
+ expectedRecordCounter++;
+ if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
+ break;
+ }
+ }
+ }
+ }
+}