You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/01/30 12:56:54 UTC

[flink] branch master updated: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel (#7199)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new be0aea7  [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel (#7199)
be0aea7 is described below

commit be0aea724956ead1ee625bd327c9d43bc7f4a63b
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Wed Jan 30 20:56:45 2019 +0800

    [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel (#7199)
    
    [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
    
    In ChannelSelector#selectChannels, it would return an array for selected channels. But considering specific implementations, only BroadcastPartitioner would select all the channels, and other implementations will select one channel.
    
    So we can simple this interface to return single channel index for benefiting performance, and in the future we can specialize the BroadcastPartitioner in a more efficient way.
---
 ...nelSelector.java => BroadcastRecordWriter.java} | 36 ++++-----
 .../io/network/api/writer/ChannelSelector.java     | 18 +++--
 .../io/network/api/writer/RecordWriter.java        | 60 +++++++++------
 .../api/writer/RoundRobinChannelSelector.java      | 11 ++-
 .../apache/flink/runtime/operators/BatchTask.java  |  6 +-
 .../runtime/operators/shipping/OutputEmitter.java  | 88 +++++++++-------------
 .../io/network/DefaultChannelSelectorTest.java     |  5 +-
 .../io/network/api/writer/RecordWriterTest.java    | 39 ++--------
 .../runtime/operators/util/OutputEmitterTest.java  | 38 +++++-----
 .../runtime/partitioner/BroadcastPartitioner.java  | 21 +++---
 .../partitioner/CustomPartitionerWrapper.java      |  7 +-
 .../runtime/partitioner/ForwardPartitioner.java    |  6 +-
 .../runtime/partitioner/GlobalPartitioner.java     |  6 +-
 .../partitioner/KeyGroupStreamPartitioner.java     |  7 +-
 .../runtime/partitioner/RebalancePartitioner.java  | 10 +--
 .../runtime/partitioner/RescalePartitioner.java    | 11 ++-
 .../runtime/partitioner/ShufflePartitioner.java    |  7 +-
 .../runtime/partitioner/StreamPartitioner.java     |  5 ++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../partitioner/BroadcastPartitionerTest.java      | 45 ++++-------
 .../partitioner/ForwardPartitionerTest.java        |  6 +-
 .../runtime/partitioner/GlobalPartitionerTest.java |  6 +-
 .../partitioner/KeyGroupStreamPartitionerTest.java | 24 ++----
 .../partitioner/RebalancePartitionerTest.java      |  7 +-
 .../partitioner/RescalePartitionerTest.java        |  4 +-
 .../partitioner/ShufflePartitionerTest.java        | 13 ++--
 .../runtime/partitioner/StreamPartitionerTest.java |  9 +--
 27 files changed, 225 insertions(+), 272 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
similarity index 53%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 5da9534..82cc167 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -20,28 +20,28 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
 
-/**
- * This is the default implementation of the {@link ChannelSelector} interface. It represents a simple round-robin
- * strategy, i.e. regardless of the record every attached exactly one output channel is selected at a time.
+import java.io.IOException;
 
- * @param <T>
- *        the type of record which is sent through the attached output gate
+/**
+ * A special record-oriented runtime result writer only for broadcast mode.
+ *
+ * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and handles {@link #emit(IOReadableWritable)}
+ * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more efficient way.
+ *
+ * @param <T> the type of the record that can be emitted with this record writer
  */
-public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
-
-	/** Stores the index of the channel to send the next record to. */
-	private final int[] nextChannelToSendTo = new int[] { -1 };
-
-	private int numberOfChannels;
-
-	@Override
-	public void setup(int numberOfChannels) {
-		this.numberOfChannels = numberOfChannels;
+public class BroadcastRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+
+	public BroadcastRecordWriter(
+			ResultPartitionWriter writer,
+			ChannelSelector<T> channelSelector,
+			long timeout,
+			String taskName) {
+		super(writer, channelSelector, timeout, taskName);
 	}
 
 	@Override
-	public int[] selectChannels(final T record) {
-		nextChannelToSendTo[0] = (nextChannelToSendTo[0] + 1) % numberOfChannels;
-		return nextChannelToSendTo;
+	public void emit(T record) throws IOException, InterruptedException {
+		broadcastEmit(record);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java
index 403b75c..b75fc3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java
@@ -37,12 +37,20 @@ public interface ChannelSelector<T extends IOReadableWritable> {
 	void setup(int numberOfChannels);
 
 	/**
-	 * Returns the logical channel indexes, to which the given record should be
-	 * written.
+	 * Returns the logical channel index, to which the given record should be written. It is
+	 * illegal to call this method for broadcast channel selectors and this method can remain
+	 * not implemented in that case (for example by throwing {@link UnsupportedOperationException}).
 	 *
 	 * @param record the record to determine the output channels for.
-	 * @return an array of integer numbers which indicate the indices of the output
-	 * 		channels through which the record shall be forwarded.
+	 * @return an integer number which indicates the index of the output
+	 * 		channel through which the record shall be forwarded.
 	 */
-	int[] selectChannels(T record);
+	int selectChannel(T record);
+
+	/**
+	 * Returns whether the channel selector always selects all the output channels.
+	 *
+	 * @return true if the selector is for broadcast mode.
+	 */
+	boolean isBroadcast();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index d1f8d3d..8817a3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -88,12 +88,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	private Throwable flusherException;
 
 	public RecordWriter(ResultPartitionWriter writer) {
-		this(writer, new RoundRobinChannelSelector<T>());
-	}
-
-	@SuppressWarnings("unchecked")
-	public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
-		this(writer, channelSelector, -1, null);
+		this(writer, new RoundRobinChannelSelector<T>(), -1, null);
 	}
 
 	public RecordWriter(
@@ -130,7 +125,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	public void emit(T record) throws IOException, InterruptedException {
 		checkErroneous();
-		emit(record, channelSelector.selectChannels(record));
+		emit(record, channelSelector.selectChannel(record));
 	}
 
 	/**
@@ -139,25 +134,10 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 */
 	public void broadcastEmit(T record) throws IOException, InterruptedException {
 		checkErroneous();
-		emit(record, broadcastChannels);
-	}
-
-	/**
-	 * This is used to send LatencyMarks to a random target channel.
-	 */
-	public void randomEmit(T record) throws IOException, InterruptedException {
-		checkErroneous();
-		serializer.serializeRecord(record);
-		if (copyFromSerializerToTargetChannel(rng.nextInt(numberOfChannels))) {
-			serializer.prune();
-		}
-	}
-
-	private void emit(T record, int[] targetChannels) throws IOException, InterruptedException {
 		serializer.serializeRecord(record);
 
 		boolean pruneAfterCopying = false;
-		for (int channel : targetChannels) {
+		for (int channel : broadcastChannels) {
 			if (copyFromSerializerToTargetChannel(channel)) {
 				pruneAfterCopying = true;
 			}
@@ -170,6 +150,21 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	/**
+	 * This is used to send LatencyMarks to a random target channel.
+	 */
+	public void randomEmit(T record) throws IOException, InterruptedException {
+		emit(record, rng.nextInt(numberOfChannels));
+	}
+
+	private void emit(T record, int targetChannel) throws IOException, InterruptedException {
+		serializer.serializeRecord(record);
+
+		if (copyFromSerializerToTargetChannel(targetChannel)) {
+			serializer.prune();
+		}
+	}
+
+	/**
 	 * @param targetChannel
 	 * @return <tt>true</tt> if the intermediate serialization buffer should be pruned
 	 */
@@ -315,6 +310,25 @@ public class RecordWriter<T extends IOReadableWritable> {
 		}
 	}
 
+	public static RecordWriter createRecordWriter(
+			ResultPartitionWriter writer,
+			ChannelSelector channelSelector,
+			long timeout,
+			String taskName) {
+		if (channelSelector.isBroadcast()) {
+			return new BroadcastRecordWriter<>(writer, channelSelector, timeout, taskName);
+		} else {
+			return new RecordWriter<>(writer, channelSelector, timeout, taskName);
+		}
+	}
+
+	public static RecordWriter createRecordWriter(
+			ResultPartitionWriter writer,
+			ChannelSelector channelSelector,
+			String taskName) {
+		return createRecordWriter(writer, channelSelector, -1, taskName);
+	}
+
 	// ------------------------------------------------------------------------
 
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
index 5da9534..40968a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.io.IOReadableWritable;
 public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
 
 	/** Stores the index of the channel to send the next record to. */
-	private final int[] nextChannelToSendTo = new int[] { -1 };
+	private int nextChannelToSendTo = -1;
 
 	private int numberOfChannels;
 
@@ -40,8 +40,13 @@ public class RoundRobinChannelSelector<T extends IOReadableWritable> implements
 	}
 
 	@Override
-	public int[] selectChannels(final T record) {
-		nextChannelToSendTo[0] = (nextChannelToSendTo[0] + 1) % numberOfChannels;
+	public int selectChannel(final T record) {
+		nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
 		return nextChannelToSendTo;
 	}
+
+	@Override
+	public boolean isBroadcast() {
+		return false;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index d5f2fd0..864411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1250,8 +1250,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 				oe = new OutputEmitter<T>(strategy, indexInSubtaskGroup, comparator, partitioner, dataDist);
 			}
 
-			final RecordWriter<SerializationDelegate<T>> recordWriter =
-					new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
+			final RecordWriter<SerializationDelegate<T>> recordWriter = RecordWriter.createRecordWriter(
+				task.getEnvironment().getWriter(outputOffset + i),
+				oe,
+				task.getEnvironment().getTaskInfo().getTaskName());
 
 			recordWriter.setMetricGroup(task.getEnvironment().getMetricGroup().getIOMetricGroup());
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
index 91547e5..227a0c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@@ -36,13 +36,10 @@ import org.apache.flink.util.MathUtils;
 public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
 	
 	/** the shipping strategy used by this output emitter */
-	private final ShipStrategyType strategy; 
-
-	/** the reused array defining target channels */
-	private int[] channels;
+	private final ShipStrategyType strategy;
 
 	/** counter to go over channels round robin */
-	private int nextChannelToSendTo = 0;
+	private int nextChannelToSendTo;
 
 	/** the total number of output channels */
 	private int numberOfChannels;
@@ -86,11 +83,14 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 	public OutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) {
 		this(strategy, 0, comparator, null, null);
 	}
-	
-	
+
 	@SuppressWarnings("unchecked")
-	public OutputEmitter(ShipStrategyType strategy, int indexInSubtaskGroup, 
-							TypeComparator<T> comparator, Partitioner<?> partitioner, DataDistribution distribution) {
+	public OutputEmitter(
+			ShipStrategyType strategy,
+			int indexInSubtaskGroup,
+			TypeComparator<T> comparator,
+			Partitioner<?> partitioner,
+			DataDistribution distribution) {
 		if (strategy == null) { 
 			throw new NullPointerException();
 		}
@@ -101,7 +101,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		this.partitioner = (Partitioner<Object>) partitioner;
 		this.distribution = distribution;
 
-
 		switch (strategy) {
 		case PARTITION_CUSTOM:
 			extractedKeys = new Object[1];
@@ -109,10 +108,8 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		case PARTITION_HASH:
 		case PARTITION_RANDOM:
 		case PARTITION_FORCED_REBALANCE:
-			channels = new int[1];
 			break;
 		case PARTITION_RANGE:
-			channels = new int[1];
 			if (comparator != null) {
 				this.flatComparators = comparator.getFlatComparators();
 				this.keys = new Object[flatComparators.length];
@@ -139,7 +136,7 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 	}
 
 	@Override
-	public final int[] selectChannels(SerializationDelegate<T> record) {
+	public final int selectChannel(SerializationDelegate<T> record) {
 		switch (strategy) {
 		case FORWARD:
 			return forward();
@@ -148,8 +145,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 			return robin(numberOfChannels);
 		case PARTITION_HASH:
 			return hashPartitionDefault(record.getInstance(), numberOfChannels);
-		case BROADCAST:
-			return broadcast(numberOfChannels);
 		case PARTITION_CUSTOM:
 			return customPartition(record.getInstance(), numberOfChannels);
 		case PARTITION_RANGE:
@@ -158,16 +153,24 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 			throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
 		}
 	}
+
+	@Override
+	public boolean isBroadcast() {
+		if (strategy == ShipStrategyType.BROADCAST) {
+			return true;
+		} else {
+			return false;
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 
-	private int[] forward() {
-		return this.channels;
+	private int forward() {
+		return 0;
 	}
 
-	private int[] robin(int numberOfChannels) {
-		int nextChannel = this.nextChannelToSendTo;
-
+	private int robin(int numberOfChannels) {
+		int nextChannel = nextChannelToSendTo;
 		if (nextChannel >= numberOfChannels) {
 			if (nextChannel == numberOfChannels) {
 				nextChannel = 0;
@@ -175,37 +178,18 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 				nextChannel %= numberOfChannels;
 			}
 		}
+		nextChannelToSendTo = nextChannel + 1;
 
-		this.channels[0] = nextChannel;
-		this.nextChannelToSendTo = nextChannel + 1;
-
-		return this.channels;
+		return nextChannel;
 	}
 
-	private int[] broadcast(int numberOfChannels) {
-		if (channels == null || channels.length != numberOfChannels) {
-			channels = new int[numberOfChannels];
-			for (int i = 0; i < numberOfChannels; i++) {
-				channels[i] = i;
-			}
-		}
-
-		return channels;
-	}
-
-	private int[] hashPartitionDefault(T record, int numberOfChannels) {
+	private int hashPartitionDefault(T record, int numberOfChannels) {
 		int hash = this.comparator.hash(record);
 
-		this.channels[0] = MathUtils.murmurHash(hash) % numberOfChannels;
-
-		return this.channels;
+		return MathUtils.murmurHash(hash) % numberOfChannels;
 	}
 
-	private final int[] rangePartition(final T record, int numberOfChannels) {
-		if (this.channels == null || this.channels.length != 1) {
-			this.channels = new int[1];
-		}
-
+	private int rangePartition(final T record, int numberOfChannels) {
 		if (this.partitionBoundaries == null) {
 			this.partitionBoundaries = new Object[numberOfChannels - 1][];
 			for (int i = 0; i < numberOfChannels - 1; i++) {
@@ -229,30 +213,26 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 				} else if (result < 0) {
 					high = mid - 1;
 				} else {
-					this.channels[0] = mid;
-					return this.channels;
+					return mid;
 				}
 			}
-			this.channels[0] = low;	// key not found, but the low index is the target
-			// bucket, since the boundaries are the upper bound
-			return this.channels;
+			// key not found, but the low index is the target bucket, since the boundaries are the upper bound
+			return low;
 		} else {
 			throw new IllegalStateException(
 				"The number of channels to partition among is inconsistent with the partitioners state.");
 		}
 	}
 
-	private int[] customPartition(T record, int numberOfChannels) {
-		if (channels == null) {
-			channels = new int[1];
+	private int customPartition(T record, int numberOfChannels) {
+		if (extractedKeys == null) {
 			extractedKeys = new Object[1];
 		}
 
 		try {
 			if (comparator.extractKeys(record, extractedKeys, 0) == 1) {
 				final Object key = extractedKeys[0];
-				channels[0] = partitioner.partition(key, numberOfChannels);
-				return channels;
+				return partitioner.partition(key, numberOfChannels);
 			}
 			else {
 				throw new RuntimeException("Inconsistency in the key comparator - comparator extracted more than one field.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
index 4abc67e..f4e4299 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
@@ -49,8 +49,7 @@ public class DefaultChannelSelectorTest {
 		StringValue record,
 		int expectedChannel) {
 
-		int[] actualResult = selector.selectChannels(record);
-		assertEquals(1, actualResult.length);
-		assertEquals(expectedChannel, actualResult[0]);
+		int actualResult = selector.selectChannel(record);
+		assertEquals(expectedChannel, actualResult);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 170c652..ce21270 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -40,6 +40,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.DeserializationUtils;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.runtime.operators.shipping.OutputEmitter;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
@@ -210,7 +212,7 @@ public class RecordWriterTest {
 		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
 		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>());
+		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter);
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// No records emitted yet, broadcast should not request a buffer
@@ -247,7 +249,7 @@ public class RecordWriterTest {
 		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
 		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>());
+		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter);
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// Emit records on some channels first (requesting buffers), then
@@ -426,9 +428,8 @@ public class RecordWriterTest {
 
 		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 		final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		final ChannelSelector selector = new Broadcast<>();
-		final RecordWriter<SerializationTestType> writer = isBroadcastEmit ?
-			new RecordWriter<>(partitionWriter) : new RecordWriter<>(partitionWriter, selector);
+		final ChannelSelector selector = new OutputEmitter(ShipStrategyType.BROADCAST, 0);
+		final RecordWriter<SerializationTestType> writer = RecordWriter.createRecordWriter(partitionWriter, selector, 0, "test");
 		final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
 			new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
@@ -593,34 +594,6 @@ public class RecordWriterTest {
 		}
 	}
 
-	/**
-	 * Broadcast channel selector that selects all the output channels.
-	 */
-	private static class Broadcast<T extends IOReadableWritable> implements ChannelSelector<T> {
-
-		private int[] returnChannel;
-
-		private int numberOfChannels;
-
-		@Override
-		public void setup(int numberOfChannels) {
-			this.numberOfChannels = numberOfChannels;
-		}
-
-		@Override
-		public int[] selectChannels(final T record) {
-			if (returnChannel != null && returnChannel.length == numberOfChannels) {
-				return returnChannel;
-			} else {
-				this.returnChannel = new int[numberOfChannels];
-				for (int i = 0; i < numberOfChannels; i++) {
-					returnChannel[i] = i;
-				}
-				return returnChannel;
-			}
-		}
-	}
-
 	private static class TrackingBufferRecycler implements BufferRecycler {
 		private final ArrayList<MemorySegment> recycledMemorySegments = new ArrayList<>();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index 0231fbf..271cfcf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -38,8 +38,8 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
-
 import org.apache.flink.types.Value;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -47,7 +47,9 @@ import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class OutputEmitterTest {
@@ -105,7 +107,7 @@ public class OutputEmitterTest {
 			if (toTaskIndex <= i || i < toTaskIndex+extraRecords - numberOfChannels) {
 				assertTrue(hits[i] == (numRecords / numberOfChannels) + 1);
 			} else {
-				assertTrue(hits[i] == numRecords/numberOfChannels);
+				assertTrue(hits[i] == numRecords / numberOfChannels);
 			}
 			totalHitCount += hits[i];
 		}
@@ -160,10 +162,8 @@ public class OutputEmitterTest {
 			record.setField(3, new DoubleValue(i * 3.141d));
 			delegate.setInstance(record);
 
-			int[] channels = selector.selectChannels(delegate);
-			for (int channel : channels) {
-				hits[channel]++;
-			}
+			int channel = selector.selectChannel(delegate);
+			hits[channel]++;
 		}
 
 		int totalHitCount = 0;
@@ -209,7 +209,7 @@ public class OutputEmitterTest {
 
 		try {
 			delegate.setInstance(record);
-			selector.selectChannels(delegate);
+			selector.selectChannel(delegate);
 		} catch (DeserializationException re) {
 			return;
 		}
@@ -237,11 +237,13 @@ public class OutputEmitterTest {
 	}
 
 	private void verifyBroadcastSelectedChannels(int numRecords, int numberOfChannels, Enum recordType) {
-		int[] hits = getSelectedChannelsHitCount(ShipStrategyType.BROADCAST, numRecords, numberOfChannels, recordType);
-
-		for (int hit : hits) {
-			assertTrue(hit + "", hit == numRecords);
+		try {
+			getSelectedChannelsHitCount(ShipStrategyType.BROADCAST, numRecords, numberOfChannels, recordType);
+		} catch (UnsupportedOperationException ex) {
+			return;
 		}
+
+		fail("Broadcast selector does not support select channels.");
 	}
 
 	private boolean verifyWrongPartitionHashKey(int position, int fieldNum) {
@@ -256,7 +258,7 @@ public class OutputEmitterTest {
 		delegate.setInstance(record);
 
 		try {
-			selector.selectChannels(delegate);
+			selector.selectChannel(delegate);
 		} catch (NullKeyFieldException re) {
 			Assert.assertEquals(position, re.getFieldNumber());
 			return true;
@@ -283,6 +285,7 @@ public class OutputEmitterTest {
 			int numberOfChannels) {
 		final ChannelSelector selector = new OutputEmitter<>(shipStrategyType, comparator);
 		selector.setup(numberOfChannels);
+		assertEquals(shipStrategyType == ShipStrategyType.BROADCAST, selector.isBroadcast());
 		return selector;
 	}
 
@@ -303,10 +306,8 @@ public class OutputEmitterTest {
 			Record record = new Record(value);
 			delegate.setInstance(record);
 
-			int[] channels = selector.selectChannels(delegate);
-			for (int channel : channels) {
-				hits[channel]++;
-			}
+			int channel = selector.selectChannel(delegate);
+			hits[channel]++;
 		}
 		return hits;
 	}
@@ -317,10 +318,9 @@ public class OutputEmitterTest {
 			int record,
 			int numberOfChannels) {
 		serializationDelegate.setInstance(record);
-		int[] selectedChannels = selector.selectChannels(serializationDelegate);
+		int selectedChannel = selector.selectChannel(serializationDelegate);
 
-		assertTrue(selectedChannels.length == 1);
-		assertTrue(selectedChannels[0] >= 0 && selectedChannels[0] <= numberOfChannels - 1);
+		assertTrue(selectedChannel >= 0 && selectedChannel <= numberOfChannels - 1);
 	}
 
 	private static class TestIntComparator extends TypeComparator<Integer> {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
index 0614ca1..1c81151 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
@@ -30,19 +30,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray;
+	/**
+	 * Note: Broadcast mode could be handled directly for all the output channels
+	 * in record writer, so it is no need to select channels via this method.
+	 */
+	@Override
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
+		throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
+	}
 
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
-		if (returnArray != null && returnArray.length == numberOfChannels) {
-			return returnArray;
-		} else {
-			this.returnArray = new int[numberOfChannels];
-			for (int i = 0; i < numberOfChannels; i++) {
-				returnArray[i] = i;
-			}
-			return returnArray;
-		}
+	public boolean isBroadcast() {
+		return true;
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 73041d1..0e7e59b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private final int[] returnArray = new int[1];
 	Partitioner<K> partitioner;
 	KeySelector<T, K> keySelector;
 
@@ -45,7 +44,7 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
 	}
 
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
 		K key;
 		try {
 			key = keySelector.getKey(record.getInstance().getValue());
@@ -53,9 +52,7 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
 			throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
 		}
 
-		returnArray[0] = partitioner.partition(key, numberOfChannels);
-
-		return returnArray;
+		return partitioner.partition(key, numberOfChannels);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
index 91530f1..0fb84a4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
@@ -30,11 +30,9 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class ForwardPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private final int[] returnArray = new int[] {0};
-
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
-		return returnArray;
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
+		return 0;
 	}
 
 	public StreamPartitioner<T> copy() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
index 9f95ece..414f9b5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
@@ -30,11 +30,9 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class GlobalPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private final int[] returnArray = new int[] { 0 };
-
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
-		return returnArray;
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
+		return 0;
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 9c58e2a..8676ef9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.Preconditions;
 public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
 	private static final long serialVersionUID = 1L;
 
-	private final int[] returnArray = new int[1];
-
 	private final KeySelector<T, K> keySelector;
 
 	private int maxParallelism;
@@ -50,15 +48,14 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
 	}
 
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
 		K key;
 		try {
 			key = keySelector.getKey(record.getInstance().getValue());
 		} catch (Exception e) {
 			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
 		}
-		returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
-		return returnArray;
+		return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index d74a25d..dc25837 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -33,19 +33,19 @@ import java.util.concurrent.ThreadLocalRandom;
 public class RebalancePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private final int[] returnArray = new int[1];
+	private int nextChannelToSendTo;
 
 	@Override
 	public void setup(int numberOfChannels) {
 		super.setup(numberOfChannels);
 
-		returnArray[0] = ThreadLocalRandom.current().nextInt(numberOfChannels);
+		nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
 	}
 
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
-		returnArray[0] = (returnArray[0] + 1) % numberOfChannels;
-		return returnArray;
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
+		nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
+		return nextChannelToSendTo;
 	}
 
 	public StreamPartitioner<T> copy() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
index bd65d0b..7de83e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
@@ -48,15 +48,14 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class RescalePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private final int[] returnArray = new int[] {-1};
+	private int nextChannelToSendTo = -1;
 
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
-		int newChannel = ++returnArray[0];
-		if (newChannel >= numberOfChannels) {
-			returnArray[0] = 0;
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
+		if (++nextChannelToSendTo >= numberOfChannels) {
+			nextChannelToSendTo = 0;
 		}
-		return returnArray;
+		return nextChannelToSendTo;
 	}
 
 	public StreamPartitioner<T> copy() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
index 0cc1d87..edaf096 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
@@ -36,12 +36,9 @@ public class ShufflePartitioner<T> extends StreamPartitioner<T> {
 
 	private Random random = new Random();
 
-	private final int[] returnArray = new int[1];
-
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record) {
-		returnArray[0] = random.nextInt(numberOfChannels);
-		return returnArray;
+	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
+		return random.nextInt(numberOfChannels);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index d023c1a..273b86d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -39,5 +39,10 @@ public abstract class StreamPartitioner<T> implements
 		this.numberOfChannels = numberOfChannels;
 	}
 
+	@Override
+	public boolean isBroadcast() {
+		return false;
+	}
+
 	public abstract StreamPartitioner<T> copy();
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a38886e..6d5ebea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1199,7 +1199,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 
 		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
-			new RecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
+			RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
 		output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
 		return output;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
index aea191a..5443934 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -18,49 +18,32 @@
 package org.apache.flink.streaming.runtime.partitioner;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link BroadcastPartitioner}.
  */
-public class BroadcastPartitionerTest {
+public class BroadcastPartitionerTest extends StreamPartitionerTest {
 
-	private BroadcastPartitioner<Tuple> broadcastPartitioner1;
-	private BroadcastPartitioner<Tuple> broadcastPartitioner2;
-	private BroadcastPartitioner<Tuple> broadcastPartitioner3;
-
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<>(null);
-	private SerializationDelegate<StreamRecord<Tuple>> serializationDelegate = new SerializationDelegate<>(null);
-
-	@Before
-	public void setPartitioner() {
-		broadcastPartitioner1 = createBroadcastPartitioner(1);
-		broadcastPartitioner2 = createBroadcastPartitioner(2);
-		broadcastPartitioner3 = createBroadcastPartitioner(6);
+	@Override
+	public StreamPartitioner<Tuple> createPartitioner() {
+		StreamPartitioner<Tuple> partitioner = new BroadcastPartitioner<>();
+		assertTrue(partitioner.isBroadcast());
+		return partitioner;
 	}
 
 	@Test
 	public void testSelectChannels() {
-		int[] first = new int[] { 0 };
-		int[] second = new int[] { 0, 1 };
-		int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
-
-		serializationDelegate.setInstance(streamRecord);
-
-		assertArrayEquals(first, broadcastPartitioner1.selectChannels(serializationDelegate));
-		assertArrayEquals(second, broadcastPartitioner2.selectChannels(serializationDelegate));
-		assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(serializationDelegate));
-	}
+		try {
+			streamPartitioner.selectChannel(serializationDelegate);
+		} catch (UnsupportedOperationException ex) {
+			return;
+		}
 
-	private BroadcastPartitioner<Tuple> createBroadcastPartitioner(int numberOfChannels) {
-		BroadcastPartitioner<Tuple> broadcastPartitioner = new BroadcastPartitioner<>();
-		broadcastPartitioner.setup(numberOfChannels);
-		return broadcastPartitioner;
+		fail("Broadcast selector does not support select channels.");
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
index 593119a..1ee4ae9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+
 /**
  * Tests for {@link ForwardPartitioner}.
  */
@@ -28,7 +30,9 @@ public class ForwardPartitionerTest extends StreamPartitionerTest {
 
 	@Override
 	public StreamPartitioner<Tuple> createPartitioner() {
-		return new ForwardPartitioner<>();
+		StreamPartitioner<Tuple> partitioner = new ForwardPartitioner<>();
+		assertFalse(partitioner.isBroadcast());
+		return partitioner;
 	}
 
 	@Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
index 194a099..33750d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+
 /**
  * Tests for {@link GlobalPartitioner}.
  */
@@ -28,7 +30,9 @@ public class GlobalPartitionerTest extends StreamPartitionerTest {
 
 	@Override
 	public StreamPartitioner<Tuple> createPartitioner() {
-		return new GlobalPartitioner<>();
+		StreamPartitioner<Tuple> partitioner = new GlobalPartitioner<>();
+		assertFalse(partitioner.isBroadcast());
+		return partitioner;
 	}
 
 	@Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
index 65554f4..ce5de6b8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitionerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -55,28 +54,19 @@ public class KeyGroupStreamPartitionerTest extends TestLogger {
 	}
 
 	@Test
-	public void testSelectChannelsLength() {
-		serializationDelegate1.setInstance(streamRecord1);
-
-		assertEquals(1, selectChannels(serializationDelegate1, 1).length);
-		assertEquals(1, selectChannels(serializationDelegate1, 2).length);
-		assertEquals(1, selectChannels(serializationDelegate1, 1024).length);
-	}
-
-	@Test
 	public void testSelectChannelsGrouping() {
 		serializationDelegate1.setInstance(streamRecord1);
 		serializationDelegate2.setInstance(streamRecord2);
 
-		assertArrayEquals(selectChannels(serializationDelegate1, 1), selectChannels(serializationDelegate2, 1));
-		assertArrayEquals(selectChannels(serializationDelegate1, 2), selectChannels(serializationDelegate2, 2));
-		assertArrayEquals(selectChannels(serializationDelegate1, 1024), selectChannels(serializationDelegate2, 1024));
+		assertEquals(selectChannels(serializationDelegate1, 1), selectChannels(serializationDelegate2, 1));
+		assertEquals(selectChannels(serializationDelegate1, 2), selectChannels(serializationDelegate2, 2));
+		assertEquals(selectChannels(serializationDelegate1, 1024), selectChannels(serializationDelegate2, 1024));
 	}
 
-	private int[] selectChannels(
-		SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> serializationDelegate,
-		int numberOfChannels) {
+	private int selectChannels(
+			SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> serializationDelegate,
+			int numberOfChannels) {
 		keyGroupPartitioner.setup(numberOfChannels);
-		return keyGroupPartitioner.selectChannels(serializationDelegate);
+		return keyGroupPartitioner.selectChannel(serializationDelegate);
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
index 75b551c..545b464 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -30,7 +31,9 @@ public class RebalancePartitionerTest extends StreamPartitionerTest {
 
 	@Override
 	public StreamPartitioner<Tuple> createPartitioner() {
-		return new RebalancePartitioner<>();
+		StreamPartitioner<Tuple> partitioner = new RebalancePartitioner<>();
+		assertFalse(partitioner.isBroadcast());
+		return partitioner;
 	}
 
 	@Test
@@ -38,7 +41,7 @@ public class RebalancePartitionerTest extends StreamPartitionerTest {
 		final int numberOfChannels = 3;
 		streamPartitioner.setup(numberOfChannels);
 
-		int initialChannel = selectChannelAndAssertLength();
+		int initialChannel = streamPartitioner.selectChannel(serializationDelegate);
 		assertTrue(0 <= initialChannel);
 		assertTrue(numberOfChannels > initialChannel);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 6d60d78..8ff47c7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -62,7 +62,9 @@ public class RescalePartitionerTest extends StreamPartitionerTest {
 
 	@Override
 	public StreamPartitioner<Tuple> createPartitioner() {
-		return new RescalePartitioner<>();
+		StreamPartitioner<Tuple> partitioner = new RescalePartitioner<>();
+		assertFalse(partitioner.isBroadcast());
+		return partitioner;
 	}
 
 	@Test
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
index 8c2c4e1..28fe7d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -30,7 +31,9 @@ public class ShufflePartitionerTest extends StreamPartitionerTest {
 
 	@Override
 	public StreamPartitioner<Tuple> createPartitioner() {
-		return new ShufflePartitioner<>();
+		StreamPartitioner<Tuple> partitioner = new ShufflePartitioner<>();
+		assertFalse(partitioner.isBroadcast());
+		return partitioner;
 	}
 
 	@Test
@@ -38,11 +41,11 @@ public class ShufflePartitionerTest extends StreamPartitionerTest {
 		assertSelectedChannelWithSetup(0, 1);
 
 		streamPartitioner.setup(2);
-		assertTrue(0 <= selectChannelAndAssertLength());
-		assertTrue(2 > selectChannelAndAssertLength());
+		assertTrue(0 <= streamPartitioner.selectChannel(serializationDelegate));
+		assertTrue(2 > streamPartitioner.selectChannel(serializationDelegate));
 
 		streamPartitioner.setup(1024);
-		assertTrue(0 <= selectChannelAndAssertLength());
-		assertTrue(1024 > selectChannelAndAssertLength());
+		assertTrue(0 <= streamPartitioner.selectChannel(serializationDelegate));
+		assertTrue(1024 > streamPartitioner.selectChannel(serializationDelegate));
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTest.java
index 1e72c49..4d6c67b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTest.java
@@ -43,15 +43,8 @@ public abstract class StreamPartitionerTest extends TestLogger {
 		serializationDelegate.setInstance(streamRecord);
 	}
 
-	protected int selectChannelAndAssertLength() {
-		int[] selectedChannels = streamPartitioner.selectChannels(serializationDelegate);
-		assertEquals(1, selectedChannels.length);
-
-		return selectedChannels[0];
-	}
-
 	protected void assertSelectedChannel(int expectedChannel) {
-		int actualResult = selectChannelAndAssertLength();
+		int actualResult = streamPartitioner.selectChannel(serializationDelegate);
 		assertEquals(expectedChannel, actualResult);
 	}