You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 11:30:19 UTC

[flink] 01/06: [hotfix][test] Drop InputGateConcurrentTest

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42d671c11153e67588f2c80d4d2953c53b6b067d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jul 1 14:44:38 2019 +0200

    [hotfix][test] Drop InputGateConcurrentTest
    
    Those tests are mostly superseded by StreamNetworkThroughputBenchmarkTest
    (except of mixed local/remote workload)
---
 .../network/partition/InputGateConcurrentTest.java | 278 ---------------------
 1 file changed, 278 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
deleted file mode 100644
index c7c93ef..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
-/**
- * Concurrency tests for input gates.
- */
-public class InputGateConcurrentTest {
-
-	@Test
-	public void testConsumptionWithLocalChannels() throws Exception {
-		final int numberOfChannels = 11;
-		final int buffersPerChannel = 1000;
-
-		final ResultPartition resultPartition = mock(ResultPartition.class);
-
-		final PipelinedSubpartition[] partitions = new PipelinedSubpartition[numberOfChannels];
-		final Source[] sources = new Source[numberOfChannels];
-
-		final ResultPartitionManager resultPartitionManager = createResultPartitionManager(partitions);
-
-		final SingleInputGate gate = createSingleInputGate(numberOfChannels);
-
-		for (int i = 0; i < numberOfChannels; i++) {
-			createLocalInputChannel(gate, i, resultPartitionManager);
-			partitions[i] = new PipelinedSubpartition(0, resultPartition);
-			sources[i] = new PipelinedSubpartitionSource(partitions[i]);
-		}
-
-		ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
-		ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
-		producer.start();
-		consumer.start();
-
-		// the 'sync()' call checks for exceptions and failed assertions
-		producer.sync();
-		consumer.sync();
-	}
-
-	@Test
-	public void testConsumptionWithRemoteChannels() throws Exception {
-		final int numberOfChannels = 11;
-		final int buffersPerChannel = 1000;
-
-		final ConnectionManager connManager = createDummyConnectionManager();
-		final Source[] sources = new Source[numberOfChannels];
-
-		final SingleInputGate gate = createSingleInputGate(numberOfChannels);
-
-		for (int i = 0; i < numberOfChannels; i++) {
-			RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
-			sources[i] = new RemoteChannelSource(channel);
-		}
-
-		ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
-		ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
-		producer.start();
-		consumer.start();
-
-		// the 'sync()' call checks for exceptions and failed assertions
-		producer.sync();
-		consumer.sync();
-	}
-
-	@Test
-	public void testConsumptionWithMixedChannels() throws Exception {
-		final int numberOfChannels = 61;
-		final int numLocalChannels = 20;
-		final int buffersPerChannel = 1000;
-
-		// fill the local/remote decision
-		List<Boolean> localOrRemote = new ArrayList<>(numberOfChannels);
-		for (int i = 0; i < numberOfChannels; i++) {
-			localOrRemote.add(i < numLocalChannels);
-		}
-		Collections.shuffle(localOrRemote);
-
-		final ConnectionManager connManager = createDummyConnectionManager();
-		final ResultPartition resultPartition = mock(ResultPartition.class);
-
-		final PipelinedSubpartition[] localPartitions = new PipelinedSubpartition[numLocalChannels];
-		final ResultPartitionManager resultPartitionManager = createResultPartitionManager(localPartitions);
-
-		final Source[] sources = new Source[numberOfChannels];
-
-		final SingleInputGate gate = createSingleInputGate(numberOfChannels);
-
-		for (int i = 0, local = 0; i < numberOfChannels; i++) {
-			if (localOrRemote.get(i)) {
-				// local channel
-				PipelinedSubpartition psp = new PipelinedSubpartition(0, resultPartition);
-				localPartitions[local++] = psp;
-				sources[i] = new PipelinedSubpartitionSource(psp);
-
-				createLocalInputChannel(gate, i, resultPartitionManager);
-			}
-			else {
-				//remote channel
-				RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
-				sources[i] = new RemoteChannelSource(channel);
-			}
-		}
-
-		ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
-		ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
-		producer.start();
-		consumer.start();
-
-		// the 'sync()' call checks for exceptions and failed assertions
-		producer.sync();
-		consumer.sync();
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing threads
-	// ------------------------------------------------------------------------
-
-	private abstract static class Source {
-
-		abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
-
-		abstract void flush();
-	}
-
-	private static class PipelinedSubpartitionSource extends Source {
-
-		final PipelinedSubpartition partition;
-
-		PipelinedSubpartitionSource(PipelinedSubpartition partition) {
-			this.partition = partition;
-		}
-
-		@Override
-		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
-			partition.add(bufferConsumer);
-		}
-
-		@Override
-		void flush() {
-			partition.flush();
-		}
-	}
-
-	private static class RemoteChannelSource extends Source {
-
-		final RemoteInputChannel channel;
-		private int seq = 0;
-
-		RemoteChannelSource(RemoteInputChannel channel) {
-			this.channel = channel;
-		}
-
-		@Override
-		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
-			try {
-				Buffer buffer = bufferConsumer.build();
-				checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
-				channel.onBuffer(buffer, seq++, -1);
-			}
-			finally {
-				bufferConsumer.close();
-			}
-		}
-
-		@Override
-		void flush() {
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing threads
-	// ------------------------------------------------------------------------
-
-	private static class ProducerThread extends CheckedThread {
-
-		private final Random rnd = new Random();
-		private final Source[] sources;
-		private final int numTotal;
-		private final int maxChunk;
-		private final int yieldAfter;
-
-		ProducerThread(Source[] sources, int numTotal, int maxChunk, int yieldAfter) {
-			super("producer");
-			this.sources = sources;
-			this.numTotal = numTotal;
-			this.maxChunk = maxChunk;
-			this.yieldAfter = yieldAfter;
-		}
-
-		@Override
-		public void go() throws Exception {
-			final BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(100);
-			int nextYield = numTotal - yieldAfter;
-
-			for (int i = numTotal; i > 0;) {
-				final int nextChannel = rnd.nextInt(sources.length);
-				final int chunk = Math.min(i, rnd.nextInt(maxChunk) + 1);
-
-				final Source next = sources[nextChannel];
-
-				for (int k = chunk; k > 0; --k) {
-					next.addBufferConsumer(bufferConsumer.copy());
-				}
-
-				i -= chunk;
-
-				if (i <= nextYield) {
-					nextYield -= yieldAfter;
-					//noinspection CallToThreadYield
-					Thread.yield();
-				}
-			}
-
-			for (Source source : sources) {
-				source.flush();
-			}
-		}
-	}
-
-	private static class ConsumerThread extends CheckedThread {
-
-		private final SingleInputGate gate;
-		private final int numBuffers;
-
-		ConsumerThread(SingleInputGate gate, int numBuffers) {
-			super("consumer");
-			this.gate = gate;
-			this.numBuffers = numBuffers;
-		}
-
-		@Override
-		public void go() throws Exception {
-			for (int i = numBuffers; i > 0; --i) {
-				assertNotNull(gate.getNext());
-			}
-		}
-	}
-}