You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/09 14:15:58 UTC

[GitHub] pnowojski closed pull request #7069: [FLINK-10835][network] Remove duplicated round-robin ChannelSelector implementation

pnowojski closed pull request #7069: [FLINK-10835][network] Remove duplicated round-robin ChannelSelector implementation
URL: https://github.com/apache/flink/pull/7069
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
index c707d47d9c6..96a4e1a081f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
@@ -29,26 +29,12 @@
  */
 public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
 
-	/**
-	 * Stores the index of the channel to send the next record to.
-	 */
-	private final int[] nextChannelToSendTo = new int[1];
-
-	/**
-	 * Constructs a new default channel selector.
-	 */
-	public RoundRobinChannelSelector() {
-		this.nextChannelToSendTo[0] = 0;
-	}
+	/** Stores the index of the channel to send the next record to. */
+	private final int[] nextChannelToSendTo = new int[] { -1 };
 
 	@Override
 	public int[] selectChannels(final T record, final int numberOfOutputChannels) {
-
-		int newChannel = ++this.nextChannelToSendTo[0];
-		if (newChannel >= numberOfOutputChannels) {
-			this.nextChannelToSendTo[0] = 0;
-		}
-
-		return this.nextChannelToSendTo;
+		nextChannelToSendTo[0] = (nextChannelToSendTo[0] + 1) % numberOfOutputChannels;
+		return nextChannelToSendTo;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
index e090c7fd27f..61bcde9e6cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 import org.apache.flink.types.StringValue;
 
@@ -35,17 +36,22 @@
 	 */
 	@Test
 	public void channelSelect() {
-
 		final StringValue dummyRecord = new StringValue("abc");
-		final RoundRobinChannelSelector<StringValue> selector = new RoundRobinChannelSelector<StringValue>();
-		// Test with two channels
-		final int numberOfOutputChannels = 2;
-		int[] selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
-		assertEquals(1, selectedChannels.length);
-		assertEquals(1, selectedChannels[0]);
-		selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
-		assertEquals(1, selectedChannels.length);
-		assertEquals(0, selectedChannels[0]);
+		final RoundRobinChannelSelector<StringValue> selector = new RoundRobinChannelSelector<>();
+		final int numberOfChannels = 2;
+
+		assertSelectedChannel(selector, dummyRecord, numberOfChannels, 0);
+		assertSelectedChannel(selector, dummyRecord, numberOfChannels, 1);
 	}
 
+	private void assertSelectedChannel(
+		ChannelSelector<StringValue> selector,
+		StringValue record,
+		int numberOfChannels,
+		int expectedChannel) {
+
+		int[] actualResult = selector.selectChannels(record, numberOfChannels);
+		assertEquals(1, actualResult.length);
+		assertEquals(expectedChannel, actualResult[0]);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ed9f4cc3026..52796024ac9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -210,7 +210,7 @@ public void testBroadcastEventNoRecords() throws Exception {
 		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
 		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
+		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>());
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// No records emitted yet, broadcast should not request a buffer
@@ -247,7 +247,7 @@ public void testBroadcastEventMixedRecords() throws Exception {
 		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
 		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
+		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>());
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// Emit records on some channels first (requesting buffers), then
@@ -593,20 +593,6 @@ public void read(DataInputView in) throws IOException {
 		}
 	}
 
-	/**
-	 * RoundRobin channel selector starting at 0 ({@link RoundRobinChannelSelector} starts at 1).
-	 */
-	private static class RoundRobin<T extends IOReadableWritable> implements ChannelSelector<T> {
-
-		private int[] nextChannel = new int[] { -1 };
-
-		@Override
-		public int[] selectChannels(final T record, final int numberOfOutputChannels) {
-			nextChannel[0] = (nextChannel[0] + 1) % numberOfOutputChannels;
-			return nextChannel;
-		}
-	}
-
 	/**
 	 * Broadcast channel selector that selects all the output channels.
 	 */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services