You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/09 15:13:53 UTC
[2/6] flink git commit: [FLINK-8252][benchmarks] convert network
benchmarks to streaming benchmarks
[FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks
This allows us to use the output flushing interval as a parameter to evaluate,
too.
This closes #5259.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544c9703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544c9703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544c9703
Branch: refs/heads/master
Commit: 544c9703d97668b8d4a952501756db52156ff2ef
Parents: 6cfb758
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Dec 14 17:30:19 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 9 16:10:51 2018 +0100
----------------------------------------------------------------------
.../benchmark/LongRecordWriterThread.java | 94 -------
.../benchmark/NetworkBenchmarkEnvironment.java | 278 -------------------
.../benchmark/NetworkThroughputBenchmark.java | 90 ------
.../NetworkThroughputBenchmarkTests.java | 74 -----
.../io/network/benchmark/ReceiverThread.java | 98 -------
.../benchmark/SerializingLongReceiver.java | 57 ----
.../io/benchmark/LongRecordWriterThread.java | 94 +++++++
.../runtime/io/benchmark/ReceiverThread.java | 98 +++++++
.../io/benchmark/SerializingLongReceiver.java | 57 ++++
.../StreamNetworkBenchmarkEnvironment.java | 257 ++++++++++++++++-
.../StreamNetworkPointToPointBenchmark.java | 3 +-
.../StreamNetworkThroughputBenchmark.java | 90 ++++++
.../StreamNetworkThroughputBenchmarkTests.java | 74 +++++
13 files changed, 662 insertions(+), 702 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
deleted file mode 100644
index 6018e55..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.types.LongValue;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * Wrapping thread around {@link RecordWriter} that sends a fixed number of <tt>LongValue(0)</tt>
- * records.
- */
-public class LongRecordWriterThread extends CheckedThread {
- private final RecordWriter<LongValue> recordWriter;
-
- /**
- * Future to wait on a definition of the number of records to send.
- */
- private CompletableFuture<Long> recordsToSend = new CompletableFuture<>();
-
- private volatile boolean running = true;
-
- public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
- this.recordWriter = checkNotNull(recordWriter);
- }
-
- public void shutdown() {
- running = false;
- recordsToSend.complete(0L);
- }
-
- /**
- * Initializes the record writer thread with this many numbers to send.
- *
- * <p>If the thread was already started, if may now continue.
- *
- * @param records
- * number of records to send
- */
- public synchronized void setRecordsToSend(long records) {
- checkState(!recordsToSend.isDone());
- recordsToSend.complete(records);
- }
-
- private synchronized CompletableFuture<Long> getRecordsToSend() {
- return recordsToSend;
- }
-
- private synchronized void finishSendingRecords() {
- recordsToSend = new CompletableFuture<>();
- }
-
- @Override
- public void go() throws Exception {
- while (running) {
- sendRecords(getRecordsToSend().get());
- }
- }
-
- private void sendRecords(long records) throws IOException, InterruptedException {
- LongValue value = new LongValue(0);
-
- for (int i = 1; i < records; i++) {
- recordWriter.emit(value);
- }
- value.setValue(records);
- recordWriter.broadcastEmit(value);
- recordWriter.flush();
-
- finishSendingRecords();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
deleted file mode 100644
index ff06187..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-
-import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
-
-/**
- * Context for network benchmarks executed by the external
- * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
- */
-public class NetworkBenchmarkEnvironment<T extends IOReadableWritable> {
-
- private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
-
- private static final int NUM_SLOTS_AND_THREADS = 1;
-
- private static final InetAddress LOCAL_ADDRESS;
-
- static {
- try {
- LOCAL_ADDRESS = InetAddress.getLocalHost();
- } catch (UnknownHostException e) {
- throw new Error(e);
- }
- }
-
- protected final JobID jobId = new JobID();
- protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
- protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
-
- protected NetworkEnvironment senderEnv;
- protected NetworkEnvironment receiverEnv;
- protected IOManager ioManager;
-
- protected int channels;
-
- protected ResultPartitionID[] partitionIds;
-
- public void setUp(int writers, int channels) throws Exception {
- this.channels = channels;
- this.partitionIds = new ResultPartitionID[writers];
-
- int bufferPoolSize = Math.max(2048, writers * channels * 4);
- senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
- receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
- ioManager = new IOManagerAsync();
-
- senderEnv.start();
- receiverEnv.start();
-
- generatePartitionIds();
- }
-
- public void tearDown() {
- suppressExceptions(senderEnv::shutdown);
- suppressExceptions(receiverEnv::shutdown);
- suppressExceptions(ioManager::shutdown);
- }
-
- public SerializingLongReceiver createReceiver() throws Exception {
- TaskManagerLocation senderLocation = new TaskManagerLocation(
- ResourceID.generate(),
- LOCAL_ADDRESS,
- senderEnv.getConnectionManager().getDataPort());
-
- InputGate receiverGate = createInputGate(
- jobId,
- dataSetID,
- executionAttemptID,
- senderLocation,
- receiverEnv,
- channels);
-
- SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length);
-
- receiver.start();
- return receiver;
- }
-
- public RecordWriter<T> createRecordWriter(int partitionIndex) throws Exception {
- ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
- return new RecordWriter<>(sender);
- }
-
- private void generatePartitionIds() throws Exception {
- for (int writer = 0; writer < partitionIds.length; writer++) {
- partitionIds[writer] = new ResultPartitionID();
- }
- }
-
- private NetworkEnvironment createNettyNetworkEnvironment(
- @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
-
- final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
-
- final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
- new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
-
- return new NetworkEnvironment(
- bufferPool,
- nettyConnectionManager,
- new ResultPartitionManager(),
- new TaskEventDispatcher(),
- new KvStateRegistry(),
- null,
- null,
- IOMode.SYNC,
- TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
- TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
- TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
- TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
- }
-
- protected ResultPartitionWriter createResultPartition(
- JobID jobId,
- ResultPartitionID partitionId,
- NetworkEnvironment environment,
- int channels) throws Exception {
-
- ResultPartition resultPartition = new ResultPartition(
- "sender task",
- new NoOpTaskActions(),
- jobId,
- partitionId,
- ResultPartitionType.PIPELINED_BOUNDED,
- channels,
- 1,
- environment.getResultPartitionManager(),
- new NoOpResultPartitionConsumableNotifier(),
- ioManager,
- false);
-
- // similar to NetworkEnvironment#registerTask()
- int numBuffers = resultPartition.getNumberOfSubpartitions() *
- TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
- TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
-
- BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
- resultPartition.registerBufferPool(bufferPool);
-
- environment.getResultPartitionManager().registerResultPartition(resultPartition);
-
- return resultPartition;
- }
-
- private InputGate createInputGate(
- JobID jobId,
- IntermediateDataSetID dataSetID,
- ExecutionAttemptID executionAttemptID,
- final TaskManagerLocation senderLocation,
- NetworkEnvironment environment,
- final int channels) throws IOException {
-
- InputGate[] gates = new InputGate[channels];
- for (int channel = 0; channel < channels; ++channel) {
- int finalChannel = channel;
- InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds)
- .map(partitionId -> new InputChannelDeploymentDescriptor(
- partitionId,
- ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel))))
- .toArray(InputChannelDeploymentDescriptor[]::new);
-
- final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(
- dataSetID,
- ResultPartitionType.PIPELINED_BOUNDED,
- channel,
- channelDescriptors);
-
- SingleInputGate gate = SingleInputGate.create(
- "receiving task[" + channel + "]",
- jobId,
- executionAttemptID,
- gateDescriptor,
- environment,
- new NoOpTaskActions(),
- UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
-
- // similar to NetworkEnvironment#registerTask()
- int numBuffers = gate.getNumberOfInputChannels() *
- TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
- TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
-
- BufferPool bufferPool =
- environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers);
-
- gate.setBufferPool(bufferPool);
- gates[channel] = gate;
- }
-
- if (channels > 1) {
- return new UnionInputGate(gates);
- } else {
- return gates[0];
- }
- }
-
- // ------------------------------------------------------------------------
- // Mocks
- // ------------------------------------------------------------------------
-
- /**
- * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito
- * to avoid using mockito in this benchmark class.
- */
- private static final class NoOpTaskActions implements TaskActions {
-
- @Override
- public void triggerPartitionProducerStateCheck(
- JobID jobId,
- IntermediateDataSetID intermediateDataSetId,
- ResultPartitionID resultPartitionId) {}
-
- @Override
- public void failExternally(Throwable cause) {}
- }
-
- private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
-
- @Override
- public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
deleted file mode 100644
index 799b7c3..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.types.LongValue;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Network throughput benchmarks executed by the external
- * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
- */
-public class NetworkThroughputBenchmark {
- private static final long RECEIVER_TIMEOUT = 30_000;
-
- private NetworkBenchmarkEnvironment<LongValue> environment;
- private ReceiverThread receiver;
- private LongRecordWriterThread[] writerThreads;
-
- /**
- * Executes the throughput benchmark with the given number of records.
- *
- * @param records
- * records to pass through the network stack
- */
- public void executeBenchmark(long records) throws Exception {
- final LongValue value = new LongValue();
- value.setValue(0);
-
- long lastRecord = records / writerThreads.length;
- CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(lastRecord);
-
- for (LongRecordWriterThread writerThread : writerThreads) {
- writerThread.setRecordsToSend(lastRecord);
- }
-
- recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Initializes the throughput benchmark with the given parameters.
- *
- * @param recordWriters
- * number of senders, i.e.
- * {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
- * @param channels
- * number of outgoing channels / receivers
- */
- public void setUp(int recordWriters, int channels) throws Exception {
- environment = new NetworkBenchmarkEnvironment<>();
- environment.setUp(recordWriters, channels);
- receiver = environment.createReceiver();
- writerThreads = new LongRecordWriterThread[recordWriters];
- for (int writer = 0; writer < recordWriters; writer++) {
- writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer));
- writerThreads[writer].start();
- }
- }
-
- /**
- * Shuts down a benchmark previously set up via {@link #setUp}.
- *
- * <p>This will wait for all senders to finish but timeout with an exception after 5 seconds.
- */
- public void tearDown() throws Exception {
- for (LongRecordWriterThread writerThread : writerThreads) {
- writerThread.shutdown();
- writerThread.sync(5000);
- }
- environment.tearDown();
- receiver.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
deleted file mode 100644
index c84743b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.junit.Test;
-
-/**
- * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}.
- */
-public class NetworkThroughputBenchmarkTests {
- @Test
- public void pointToPointBenchmark() throws Exception {
- NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
- benchmark.setUp(1, 1);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-
- @Test
- public void pointToMultiPointBenchmark() throws Exception {
- NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
- benchmark.setUp(1, 100);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-
- @Test
- public void multiPointToPointBenchmark() throws Exception {
- NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
- benchmark.setUp(4, 1);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-
- @Test
- public void multiPointToMultiPointBenchmark() throws Exception {
- NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
- benchmark.setUp(4, 100);
- try {
- benchmark.executeBenchmark(1_000);
- }
- finally {
- benchmark.tearDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
deleted file mode 100644
index be1c80f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.core.testutils.CheckedThread;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the
- * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels.
- */
-public abstract class ReceiverThread extends CheckedThread {
- protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
-
- protected final int expectedRepetitionsOfExpectedRecord;
-
- protected int expectedRecordCounter;
- protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
- protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
-
- protected volatile boolean running;
-
- ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
- setName(this.getClass().getName());
-
- this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
- this.running = true;
- }
-
- public synchronized CompletableFuture<?> setExpectedRecord(long record) {
- checkState(!expectedRecord.isDone());
- checkState(!recordsProcessed.isDone());
- expectedRecord.complete(record);
- expectedRecordCounter = 0;
- return recordsProcessed;
- }
-
- private synchronized CompletableFuture<Long> getExpectedRecord() {
- return expectedRecord;
- }
-
- private synchronized void finishProcessingExpectedRecords() {
- checkState(expectedRecord.isDone());
- checkState(!recordsProcessed.isDone());
-
- recordsProcessed.complete(null);
- expectedRecord = new CompletableFuture<>();
- recordsProcessed = new CompletableFuture<>();
- }
-
- @Override
- public void go() throws Exception {
- try {
- while (running) {
- readRecords(getExpectedRecord().get());
- finishProcessingExpectedRecords();
- }
- }
- catch (InterruptedException e) {
- if (running) {
- throw e;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- protected abstract void readRecords(long lastExpectedRecord) throws Exception;
-
- public void shutdown() {
- running = false;
- interrupt();
- expectedRecord.complete(0L);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
deleted file mode 100644
index 848c018..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.types.LongValue;
-
-/**
- * {@link ReceiverThread} that deserialize incoming messages.
- */
-public class SerializingLongReceiver extends ReceiverThread {
-
- private final MutableRecordReader<LongValue> reader;
-
- @SuppressWarnings("WeakerAccess")
- public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
- super(expectedRepetitionsOfExpectedRecord);
- this.reader = new MutableRecordReader<>(
- inputGate,
- new String[]{
- EnvironmentInformation.getTemporaryFileDirectory()
- });
- }
-
- protected void readRecords(long lastExpectedRecord) throws Exception {
- LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
- final LongValue value = new LongValue();
-
- while (running && reader.next(value)) {
- final long ts = value.getValue();
- if (ts == lastExpectedRecord) {
- expectedRecordCounter++;
- if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
new file mode 100644
index 0000000..e6cc2d5
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.types.LongValue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wrapping thread around {@link RecordWriter} that sends a fixed number of <tt>LongValue(0)</tt>
+ * records.
+ */
+public class LongRecordWriterThread extends CheckedThread {
+ private final RecordWriter<LongValue> recordWriter;
+
+ /**
+ * Future to wait on a definition of the number of records to send.
+ */
+ private CompletableFuture<Long> recordsToSend = new CompletableFuture<>();
+
+ private volatile boolean running = true;
+
+ public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
+ this.recordWriter = checkNotNull(recordWriter);
+ }
+
+ public void shutdown() {
+ running = false;
+ recordsToSend.complete(0L);
+ }
+
+ /**
+ * Initializes the record writer thread with this many numbers to send.
+ *
+ * <p>If the thread was already started, if may now continue.
+ *
+ * @param records
+ * number of records to send
+ */
+ public synchronized void setRecordsToSend(long records) {
+ checkState(!recordsToSend.isDone());
+ recordsToSend.complete(records);
+ }
+
+ private synchronized CompletableFuture<Long> getRecordsToSend() {
+ return recordsToSend;
+ }
+
+ private synchronized void finishSendingRecords() {
+ recordsToSend = new CompletableFuture<>();
+ }
+
+ @Override
+ public void go() throws Exception {
+ while (running) {
+ sendRecords(getRecordsToSend().get());
+ }
+ }
+
+ private void sendRecords(long records) throws IOException, InterruptedException {
+ LongValue value = new LongValue(0);
+
+ for (int i = 1; i < records; i++) {
+ recordWriter.emit(value);
+ }
+ value.setValue(records);
+ recordWriter.broadcastEmit(value);
+ recordWriter.flush();
+
+ finishSendingRecords();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
new file mode 100644
index 0000000..126efef
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the
+ * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels.
+ */
+public abstract class ReceiverThread extends CheckedThread {
+ protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
+
+ protected final int expectedRepetitionsOfExpectedRecord;
+
+ protected int expectedRecordCounter;
+ protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
+ protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
+
+ protected volatile boolean running;
+
+ ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
+ setName(this.getClass().getName());
+
+ this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
+ this.running = true;
+ }
+
+ public synchronized CompletableFuture<?> setExpectedRecord(long record) {
+ checkState(!expectedRecord.isDone());
+ checkState(!recordsProcessed.isDone());
+ expectedRecord.complete(record);
+ expectedRecordCounter = 0;
+ return recordsProcessed;
+ }
+
+ private synchronized CompletableFuture<Long> getExpectedRecord() {
+ return expectedRecord;
+ }
+
+ private synchronized void finishProcessingExpectedRecords() {
+ checkState(expectedRecord.isDone());
+ checkState(!recordsProcessed.isDone());
+
+ recordsProcessed.complete(null);
+ expectedRecord = new CompletableFuture<>();
+ recordsProcessed = new CompletableFuture<>();
+ }
+
+ @Override
+ public void go() throws Exception {
+ try {
+ while (running) {
+ readRecords(getExpectedRecord().get());
+ finishProcessingExpectedRecords();
+ }
+ }
+ catch (InterruptedException e) {
+ if (running) {
+ throw e;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected abstract void readRecords(long lastExpectedRecord) throws Exception;
+
+ public void shutdown() {
+ running = false;
+ interrupt();
+ expectedRecord.complete(0L);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
new file mode 100644
index 0000000..580612c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+ private final MutableRecordReader<LongValue> reader;
+
+ @SuppressWarnings("WeakerAccess")
+ public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
+ super(expectedRepetitionsOfExpectedRecord);
+ this.reader = new MutableRecordReader<>(
+ inputGate,
+ new String[]{
+ EnvironmentInformation.getTemporaryFileDirectory()
+ });
+ }
+
+ protected void readRecords(long lastExpectedRecord) throws Exception {
+ LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
+ final LongValue value = new LongValue();
+
+ while (running && reader.next(value)) {
+ final long ts = value.getValue();
+ if (ts == lastExpectedRecord) {
+ expectedRecordCounter++;
+ if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
+ break;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index acbbdf8..83508ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -18,23 +18,262 @@
package org.apache.flink.streaming.runtime.io.benchmark;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
+
/**
- * Context for stream network benchmarks executed by the external
+ * Context for network benchmarks executed by the external
* <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
*/
-public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> extends NetworkBenchmarkEnvironment<T> {
+public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
+
+ private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+
+ private static final int NUM_SLOTS_AND_THREADS = 1;
+
+ private static final InetAddress LOCAL_ADDRESS;
+
+ static {
+ try {
+ LOCAL_ADDRESS = InetAddress.getLocalHost();
+ } catch (UnknownHostException e) {
+ throw new Error(e);
+ }
+ }
+
+ protected final JobID jobId = new JobID();
+ protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
+ protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+
+ protected NetworkEnvironment senderEnv;
+ protected NetworkEnvironment receiverEnv;
+ protected IOManager ioManager;
+
+ protected int channels;
+
+ protected ResultPartitionID[] partitionIds;
+
+ public void setUp(int writers, int channels) throws Exception {
+ this.channels = channels;
+ this.partitionIds = new ResultPartitionID[writers];
+
+ int bufferPoolSize = Math.max(2048, writers * channels * 4);
+ senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+ receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+ ioManager = new IOManagerAsync();
+
+ senderEnv.start();
+ receiverEnv.start();
+
+ generatePartitionIds();
+ }
+
+ public void tearDown() {
+ suppressExceptions(senderEnv::shutdown);
+ suppressExceptions(receiverEnv::shutdown);
+ suppressExceptions(ioManager::shutdown);
+ }
+
+ public SerializingLongReceiver createReceiver() throws Exception {
+ TaskManagerLocation senderLocation = new TaskManagerLocation(
+ ResourceID.generate(),
+ LOCAL_ADDRESS,
+ senderEnv.getConnectionManager().getDataPort());
+
+ InputGate receiverGate = createInputGate(
+ jobId,
+ dataSetID,
+ executionAttemptID,
+ senderLocation,
+ receiverEnv,
+ channels);
+
+ SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length);
+
+ receiver.start();
+ return receiver;
+ }
+
+ public StreamRecordWriter<T> createRecordWriter(int partitionIndex, long flushTimeout) throws Exception {
+ ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
+ return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<T>(), flushTimeout);
+ }
+
+ private void generatePartitionIds() throws Exception {
+ for (int writer = 0; writer < partitionIds.length; writer++) {
+ partitionIds[writer] = new ResultPartitionID();
+ }
+ }
+
+ private NetworkEnvironment createNettyNetworkEnvironment(
+ @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+
+ final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+
+ final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
+ new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+
+ return new NetworkEnvironment(
+ bufferPool,
+ nettyConnectionManager,
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new KvStateRegistry(),
+ null,
+ null,
+ IOMode.SYNC,
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+ TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
+ }
+
+ protected ResultPartitionWriter createResultPartition(
+ JobID jobId,
+ ResultPartitionID partitionId,
+ NetworkEnvironment environment,
+ int channels) throws Exception {
+
+ ResultPartition resultPartition = new ResultPartition(
+ "sender task",
+ new NoOpTaskActions(),
+ jobId,
+ partitionId,
+ ResultPartitionType.PIPELINED_BOUNDED,
+ channels,
+ 1,
+ environment.getResultPartitionManager(),
+ new NoOpResultPartitionConsumableNotifier(),
+ ioManager,
+ false);
+
+ // similar to NetworkEnvironment#registerTask()
+ int numBuffers = resultPartition.getNumberOfSubpartitions() *
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+ TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+ BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
+ resultPartition.registerBufferPool(bufferPool);
+
+ environment.getResultPartitionManager().registerResultPartition(resultPartition);
+
+ return resultPartition;
+ }
+
+ private InputGate createInputGate(
+ JobID jobId,
+ IntermediateDataSetID dataSetID,
+ ExecutionAttemptID executionAttemptID,
+ final TaskManagerLocation senderLocation,
+ NetworkEnvironment environment,
+ final int channels) throws IOException {
+
+ InputGate[] gates = new InputGate[channels];
+ for (int channel = 0; channel < channels; ++channel) {
+ int finalChannel = channel;
+ InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds)
+ .map(partitionId -> new InputChannelDeploymentDescriptor(
+ partitionId,
+ ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel))))
+ .toArray(InputChannelDeploymentDescriptor[]::new);
+
+ final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(
+ dataSetID,
+ ResultPartitionType.PIPELINED_BOUNDED,
+ channel,
+ channelDescriptors);
+
+ SingleInputGate gate = SingleInputGate.create(
+ "receiving task[" + channel + "]",
+ jobId,
+ executionAttemptID,
+ gateDescriptor,
+ environment,
+ new NoOpTaskActions(),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+
+ // similar to NetworkEnvironment#registerTask()
+ int numBuffers = gate.getNumberOfInputChannels() *
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+ TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+ BufferPool bufferPool =
+ environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers);
+
+ gate.setBufferPool(bufferPool);
+ gates[channel] = gate;
+ }
+
+ if (channels > 1) {
+ return new UnionInputGate(gates);
+ } else {
+ return gates[0];
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Mocks
+ // ------------------------------------------------------------------------
+
+ /**
+ * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito
+ * to avoid using mockito in this benchmark class.
+ */
+ private static final class NoOpTaskActions implements TaskActions {
+
+ @Override
+ public void triggerPartitionProducerStateCheck(
+ JobID jobId,
+ IntermediateDataSetID intermediateDataSetId,
+ ResultPartitionID resultPartitionId) {}
+
+ @Override
+ public void failExternally(Throwable cause) {}
+ }
+
+ private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
- public RecordWriter<T> createStreamRecordWriter(int partitionIndex, long flushTimeout)
- throws Exception {
- ResultPartitionWriter sender =
- createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
- return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<>(), flushTimeout);
+ @Override
+ public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 9286485..843d3e2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.io.benchmark;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.benchmark.ReceiverThread;
import org.apache.flink.types.LongValue;
import java.util.concurrent.CompletableFuture;
@@ -74,7 +73,7 @@ public class StreamNetworkPointToPointBenchmark {
environment.setUp(1, 1);
receiver = environment.createReceiver();
- recordWriter = environment.createStreamRecordWriter(0, flushTimeout);
+ recordWriter = environment.createRecordWriter(0, flushTimeout);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
new file mode 100644
index 0000000..3f41b00
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.types.LongValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Network throughput benchmarks executed by the external
+ * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
+ */
+public class StreamNetworkThroughputBenchmark {
+ private static final long RECEIVER_TIMEOUT = 30_000;
+
+ private StreamNetworkBenchmarkEnvironment<LongValue> environment;
+ private ReceiverThread receiver;
+ private LongRecordWriterThread[] writerThreads;
+
+ /**
+ * Executes the throughput benchmark with the given number of records.
+ *
+ * @param records
+ * records to pass through the network stack
+ */
+ public void executeBenchmark(long records) throws Exception {
+ final LongValue value = new LongValue();
+ value.setValue(0);
+
+ long lastRecord = records / writerThreads.length;
+ CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(lastRecord);
+
+ for (LongRecordWriterThread writerThread : writerThreads) {
+ writerThread.setRecordsToSend(lastRecord);
+ }
+
+ recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Initializes the throughput benchmark with the given parameters.
+ *
+ * @param recordWriters
+ * number of senders, i.e.
+ * {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
+ * @param channels
+ * number of outgoing channels / receivers
+ */
+ public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception {
+ environment = new StreamNetworkBenchmarkEnvironment<>();
+ environment.setUp(recordWriters, channels);
+ receiver = environment.createReceiver();
+ writerThreads = new LongRecordWriterThread[recordWriters];
+ for (int writer = 0; writer < recordWriters; writer++) {
+ writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout));
+ writerThreads[writer].start();
+ }
+ }
+
+ /**
+ * Shuts down a benchmark previously set up via {@link #setUp}.
+ *
+ * <p>This will wait for all senders to finish but timeout with an exception after 5 seconds.
+ */
+ public void tearDown() throws Exception {
+ for (LongRecordWriterThread writerThread : writerThreads) {
+ writerThread.shutdown();
+ writerThread.sync(5000);
+ }
+ environment.tearDown();
+ receiver.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
new file mode 100644
index 0000000..8af8148
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
+ */
+public class StreamNetworkThroughputBenchmarkTests {
+ @Test
+ public void pointToPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(1, 1, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void pointToMultiPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(1, 100, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void multiPointToPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(4, 1, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+
+ @Test
+ public void multiPointToMultiPointBenchmark() throws Exception {
+ StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+ benchmark.setUp(4, 100, 100);
+ try {
+ benchmark.executeBenchmark(1_000);
+ }
+ finally {
+ benchmark.tearDown();
+ }
+ }
+}