You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/26 11:49:41 UTC

[GitHub] pnowojski closed pull request #7146: [FLINK-10942][network, test] Deduplicate common codes in OutputEmitterTest

pnowojski closed pull request #7146: [FLINK-10942][network,test] Deduplicate common codes in OutputEmitterTest
URL: https://github.com/apache/flink/pull/7146
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index 204e3f02c7c..5c7ed3ad148 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -35,11 +35,11 @@
 import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
 import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,404 +48,269 @@
 import java.io.PipedOutputStream;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class OutputEmitterTest {
-	
-	
+
 	@Test
 	public void testPartitionHash() {
 		// Test for IntValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> intComp = new RecordComparatorFactory(new int[] {0}, new Class[] {IntValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-		
-		int numChans = 100;
-		int numRecs = 50000;
-		int[] hit = new int[numChans];
-
-		for (int i = 0; i < numRecs; i++) {
-			IntValue k = new IntValue(i);
-			Record rec = new Record(k);
-			
-			delegate.setInstance(rec);
-			
-			int[] chans = oe1.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		int cnt = 0;
-		for (int aHit : hit) {
-			assertTrue(aHit > 0);
-			cnt += aHit;
-		}
-		assertTrue(cnt == numRecs);
-
+		verifyPartitionHashSelectedChannels(50000, 100, RecordType.INTEGER);
 		// Test for StringValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> stringComp = new RecordComparatorFactory(new int[] {0}, new Class[] {StringValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe2 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, stringComp);
+		verifyPartitionHashSelectedChannels(10000, 100, RecordType.STRING);
 
-		numChans = 100;
-		numRecs = 10000;
-		
-		hit = new int[numChans];
-
-		for (int i = 0; i < numRecs; i++) {
-			StringValue k = new StringValue(i + "");
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-				
-			int[] chans = oe2.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		cnt = 0;
-		for (int aHit : hit) {
-			assertTrue(aHit > 0);
-			cnt += aHit;
-		}
-		assertTrue(cnt == numRecs);
-		
-		// test hash corner cases
+		// Test hash corner cases
 		final TestIntComparator testIntComp = new TestIntComparator();
-		final ChannelSelector<SerializationDelegate<Integer>> oe3 = new OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
-		final SerializationDelegate<Integer> intDel = new SerializationDelegate<Integer>(new IntSerializer());
-		
-		numChans = 100;
-		
-		// MinVal hash
-		intDel.setInstance(Integer.MIN_VALUE);
-		int[] chans = oe3.selectChannels(intDel, numChans);
-		assertTrue(chans.length == 1);
-		assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-		
-		// -1 hash
-		intDel.setInstance(-1);
-		chans = oe3.selectChannels(intDel, hit.length);
-		assertTrue(chans.length == 1);
-		assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-		
-		// 0 hash
-		intDel.setInstance(0);
-		chans = oe3.selectChannels(intDel, hit.length);
-		assertTrue(chans.length == 1);
-		assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-		
-		// 1 hash
-		intDel.setInstance(1);
-		chans = oe3.selectChannels(intDel, hit.length);
-		assertTrue(chans.length == 1);
-		assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-		
-		// MaxVal hash
-		intDel.setInstance(Integer.MAX_VALUE);
-		chans = oe3.selectChannels(intDel, hit.length);
-		assertTrue(chans.length == 1);
-		assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
+		final ChannelSelector<SerializationDelegate<Integer>> selector = new OutputEmitter<>(
+			ShipStrategyType.PARTITION_HASH, testIntComp);
+		final SerializationDelegate<Integer> serializationDelegate = new SerializationDelegate<>(new IntSerializer());
+
+		assertPartitionHashSelectedChannels(selector, serializationDelegate, Integer.MIN_VALUE, 100);
+		assertPartitionHashSelectedChannels(selector, serializationDelegate, -1, 100);
+		assertPartitionHashSelectedChannels(selector, serializationDelegate, 0, 100);
+		assertPartitionHashSelectedChannels(selector, serializationDelegate, 1, 100);
+		assertPartitionHashSelectedChannels(selector, serializationDelegate, Integer.MAX_VALUE, 100);
 	}
 
 	@Test
 	public void testForward() {
-		// Test for IntValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> intComp = new RecordComparatorFactory(new int[] {0}, new Class[] {IntValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.FORWARD, intComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+		final int numChannels = 100;
 
-		int numChannels = 100;
+		// Test for IntValue
 		int numRecords = 50000 + numChannels / 2;
-
-		int[] hit = new int[numChannels];
-
-		for (int i = 0; i < numRecords; i++) {
-			IntValue k = new IntValue(i);
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-
-			int[] chans = oe1.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		assertTrue(hit[0] == numRecords);
-		for (int i = 1; i < hit.length; i++) {
-			assertTrue(hit[i] == 0);
-		}
+		verifyForwardSelectedChannels(numRecords, numChannels, RecordType.INTEGER);
 
 		// Test for StringValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> stringComp = new RecordComparatorFactory(new int[] {0}, new Class[] {StringValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe2 = new OutputEmitter<Record>(ShipStrategyType.FORWARD, stringComp);
-
-		numChannels = 100;
 		numRecords = 10000 + numChannels / 2;
-
-		hit = new int[numChannels];
-
-		for (int i = 0; i < numRecords; i++) {
-			StringValue k = new StringValue(i + "");
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-
-			int[] chans = oe2.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		assertTrue(hit[0] == numRecords);
-		for (int i = 1; i < hit.length; i++) {
-			assertTrue(hit[i] == 0);
-		}
+		verifyForwardSelectedChannels(numRecords, numChannels, RecordType.STRING);
 	}
 
 	@Test
 	public void testForcedRebalance() {
-		// Test for IntValue
-		int numChannels = 100;
-		int toTaskIndex = numChannels * 6/7;
+		final int numChannels = 100;
+		int toTaskIndex = numChannels * 6 / 7;
 		int fromTaskIndex = toTaskIndex + numChannels;
 		int extraRecords = numChannels / 3;
 		int numRecords = 50000 + extraRecords;
+		final SerializationDelegate<Record> delegate = new SerializationDelegate<>(
+			new RecordSerializerFactory().getSerializer());
+		final ChannelSelector<SerializationDelegate<Record>> selector = new OutputEmitter<>(
+			ShipStrategyType.PARTITION_FORCED_REBALANCE, fromTaskIndex);
 
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE, fromTaskIndex);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
-		int[] hit = new int[numChannels];
-
-		for (int i = 0; i < numRecords; i++) {
-			IntValue k = new IntValue(i);
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-
-			int[] chans = oe1.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		int cnt = 0;
-		for (int i = 0; i < hit.length; i++) {
-			if (toTaskIndex <= i || i < toTaskIndex+extraRecords-numChannels) {
-				assertTrue(hit[i] == (numRecords/numChannels)+1);
+		// Test for IntValue
+		int[] hits = getSelectedChannelsHitCount(selector, delegate, RecordType.INTEGER, numRecords, numChannels);
+		int totalHitCount = 0;
+		for (int i = 0; i < hits.length; i++) {
+			if (toTaskIndex <= i || i < toTaskIndex+extraRecords - numChannels) {
+				assertTrue(hits[i] == (numRecords / numChannels) + 1);
 			} else {
-				assertTrue(hit[i] == numRecords/numChannels);
+				assertTrue(hits[i] == numRecords/numChannels);
 			}
-			cnt += hit[i];
+			totalHitCount += hits[i];
 		}
-		assertTrue(cnt == numRecords);
+		assertTrue(totalHitCount == numRecords);
 
-		// Test for StringValue
-		numChannels = 100;
 		toTaskIndex = numChannels / 5;
 		fromTaskIndex = toTaskIndex + 2 * numChannels;
-		extraRecords = numChannels * 2/9;
+		extraRecords = numChannels * 2 / 9;
 		numRecords = 10000 + extraRecords;
 
-		final ChannelSelector<SerializationDelegate<Record>> oe2 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE, fromTaskIndex);
-
-		hit = new int[numChannels];
-
-		for (int i = 0; i < numRecords; i++) {
-			StringValue k = new StringValue(i + "");
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-
-			int[] chans = oe2.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		cnt = 0;
-		for (int i = 0; i < hit.length; i++) {
-			if (toTaskIndex <= i && i < toTaskIndex+extraRecords) {
-				assertTrue(hit[i] == (numRecords/numChannels)+1);
+		// Test for StringValue
+		final ChannelSelector<SerializationDelegate<Record>> selector2 = new OutputEmitter<>(
+			ShipStrategyType.PARTITION_FORCED_REBALANCE, fromTaskIndex);
+		hits = getSelectedChannelsHitCount(selector2, delegate, RecordType.STRING, numRecords, numChannels);
+		totalHitCount = 0;
+		for (int i = 0; i < hits.length; i++) {
+			if (toTaskIndex <= i && i < toTaskIndex + extraRecords) {
+				assertTrue(hits[i] == (numRecords / numChannels) + 1);
 			} else {
-				assertTrue(hit[i] == numRecords/numChannels);
+				assertTrue(hits[i] == numRecords / numChannels);
 			}
-			cnt += hit[i];
+			totalHitCount += hits[i];
 		}
-		assertTrue(cnt == numRecords);
+		assertTrue(totalHitCount == numRecords);
 	}
 	
 	@Test
 	public void testBroadcast() {
 		// Test for IntValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> intComp = new RecordComparatorFactory(new int[] {0}, new Class[] {IntValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.BROADCAST, intComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-		
-		int numChannels = 100;
-		int numRecords = 50000;
-		
-		int[] hit = new int[numChannels];
-
-		for (int i = 0; i < numRecords; i++) {
-			IntValue k = new IntValue(i);
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-			
-			int[] chans = oe1.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		for (int aHit : hit) {
-			assertTrue(aHit + "", aHit == numRecords);
-		}
-		
+		verifyBroadcastSelectedChannels(100, 50000, RecordType.INTEGER);
 		// Test for StringValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> stringComp = new RecordComparatorFactory(new int[] {0}, new Class[] {StringValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe2 = new OutputEmitter<Record>(ShipStrategyType.BROADCAST, stringComp);
-
-		numChannels = 100;
-		numRecords = 5000;
-		
-		hit = new int[numChannels];
-
-		for (int i = 0; i < numRecords; i++) {
-			StringValue k = new StringValue(i + "");
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-				
-			int[] chans = oe2.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		for (int aHit : hit) {
-			assertTrue(aHit + "", aHit == numRecords);
-		}
+		verifyBroadcastSelectedChannels(100, 50000, RecordType.STRING);
 	}
 	
 	@Test
 	public void testMultiKeys() {
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> multiComp = new RecordComparatorFactory(new int[] {0,1,3}, new Class[] {IntValue.class, StringValue.class, DoubleValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, multiComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+		final TypeComparator<Record> multiComp = new RecordComparatorFactory(
+			new int[] {0,1, 3}, new Class[] {IntValue.class, StringValue.class, DoubleValue.class}).createComparator();
+		final ChannelSelector<SerializationDelegate<Record>> selector = new OutputEmitter<>(
+			ShipStrategyType.PARTITION_HASH, multiComp);
+		final SerializationDelegate<Record> delegate = new SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
 		
 		int numChannels = 100;
 		int numRecords = 5000;
-		
-		int[] hit = new int[numChannels];
-
+		int[] hits = new int[numChannels];
 		for (int i = 0; i < numRecords; i++) {
-			Record rec = new Record(4);
-			rec.setField(0, new IntValue(i));
-			rec.setField(1, new StringValue("AB"+i+"CD"+i));
-			rec.setField(3, new DoubleValue(i*3.141d));
-			delegate.setInstance(rec);
-			
-			int[] chans = oe1.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
+			Record record = new Record(4);
+			record.setField(0, new IntValue(i));
+			record.setField(1, new StringValue("AB" + i + "CD" + i));
+			record.setField(3, new DoubleValue(i * 3.141d));
+			delegate.setInstance(record);
+
+			int[] channels = selector.selectChannels(delegate, hits.length);
+			for (int channel : channels) {
+				hits[channel]++;
 			}
 		}
 
-		int cnt = 0;
-		for (int aHit : hit) {
-			assertTrue(aHit > 0);
-			cnt += aHit;
+		int totalHitCount = 0;
+		for (int hit : hits) {
+			assertTrue(hit > 0);
+			totalHitCount += hit;
 		}
-		assertTrue(cnt == numRecords);
-		
+		assertTrue(totalHitCount == numRecords);
 	}
 	
 	@Test
 	public void testMissingKey() {
-		// Test for IntValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> intComp = new RecordComparatorFactory(new int[] {1}, new Class[] {IntValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-		
-		Record rec = new Record(0);
-		rec.setField(0, new IntValue(1));
-		delegate.setInstance(rec);
-		
-		try {
-			oe1.selectChannels(delegate, 100);
-		} catch (KeyFieldOutOfBoundsException re) {
-			Assert.assertEquals(1, re.getFieldNumber());
-			return;
+		if (!verifyWrongPartitionHashKey(1, 0)) {
+			Assert.fail("Expected a KeyFieldOutOfBoundsException.");
 		}
-		Assert.fail("Expected a KeyFieldOutOfBoundsException.");
 	}
 	
 	@Test
 	public void testNullKey() {
-		// Test for IntValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> intComp = new RecordComparatorFactory(new int[] {0}, new Class[] {IntValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-		
-		Record rec = new Record(2);
-		rec.setField(1, new IntValue(1));
-		delegate.setInstance(rec);
-
-		try {
-			oe1.selectChannels(delegate, 100);
-		} catch (NullKeyFieldException re) {
-			Assert.assertEquals(0, re.getFieldNumber());
-			return;
+		if (!verifyWrongPartitionHashKey(0, 1)) {
+			Assert.fail("Expected a NullKeyFieldException.");
 		}
-		Assert.fail("Expected a NullKeyFieldException.");
 	}
 	
 	@Test
-	public void testWrongKeyClass() {
-		
+	public void testWrongKeyClass() throws Exception {
 		// Test for IntValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> doubleComp = new RecordComparatorFactory(new int[] {0}, new Class[] {DoubleValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, doubleComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-		
-		
-		;
-		
-		Record rec = null;
-		
-		try {
-			PipedInputStream pipedInput = new PipedInputStream(1024*1024);
-			
-			DataInputView in = new DataInputViewStreamWrapper(pipedInput);
-			DataOutputView out = new DataOutputViewStreamWrapper(new PipedOutputStream(pipedInput));
-			
-			rec = new Record(1);
-			rec.setField(0, new IntValue());
-			
-			rec.write(out);
-			rec = new Record();
-			rec.read(in);
-		} catch (IOException e) {
-			fail("Test erroneous");
-		}
+		final TypeComparator<Record> doubleComp = new RecordComparatorFactory(
+			new int[] {0}, new Class[] {DoubleValue.class}).createComparator();
+		final ChannelSelector<SerializationDelegate<Record>> selector = new OutputEmitter<>(
+			ShipStrategyType.PARTITION_HASH, doubleComp);
+		final SerializationDelegate<Record> delegate = new SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+		PipedInputStream pipedInput = new PipedInputStream(1024 * 1024);
+		DataInputView in = new DataInputViewStreamWrapper(pipedInput);
+		DataOutputView out = new DataOutputViewStreamWrapper(new PipedOutputStream(pipedInput));
+
+		Record record = new Record(1);
+		record.setField(0, new IntValue());
+		record.write(out);
+		record = new Record();
+		record.read(in);
 
 		try {
-			delegate.setInstance(rec);
-			oe1.selectChannels(delegate, 100);
+			delegate.setInstance(record);
+			selector.selectChannels(delegate, 100);
 		} catch (DeserializationException re) {
 			return;
 		}
 		Assert.fail("Expected a NullKeyFieldException.");
 	}
-	
-	@SuppressWarnings({"serial", "rawtypes"})
+
+	private void verifyPartitionHashSelectedChannels(int numRecords, int numChannels, Enum recordType) {
+		int[] hits = getSelectedChannelsHitCount(ShipStrategyType.PARTITION_HASH, numRecords, numChannels, recordType);
+
+		int totalHitCount = 0;
+		for (int hit : hits) {
+			assertTrue(hit > 0);
+			totalHitCount += hit;
+		}
+		assertTrue(totalHitCount == numRecords);
+	}
+
+	private void assertPartitionHashSelectedChannels(
+			ChannelSelector selector,
+			SerializationDelegate<Integer> serializationDelegate,
+			int record,
+			int numChannels) {
+		serializationDelegate.setInstance(record);
+		int[] selectedChannels = selector.selectChannels(serializationDelegate, numChannels);
+
+		assertTrue(selectedChannels.length == 1);
+		assertTrue(selectedChannels[0] >= 0 && selectedChannels[0] <= numChannels - 1);
+	}
+
+	private void verifyForwardSelectedChannels(int numRecords, int numChannels, Enum recordType) {
+		int[] hits = getSelectedChannelsHitCount(ShipStrategyType.FORWARD, numRecords, numChannels, recordType);
+
+		assertTrue(hits[0] == numRecords);
+		for (int i = 1; i < hits.length; i++) {
+			assertTrue(hits[i] == 0);
+		}
+	}
+
+	private void verifyBroadcastSelectedChannels(int numRecords, int numChannels, Enum recordType) {
+		int[] hits = getSelectedChannelsHitCount(ShipStrategyType.BROADCAST, numRecords, numChannels, recordType);
+
+		for (int hit : hits) {
+			assertTrue(hit + "", hit == numRecords);
+		}
+	}
+
+	private boolean verifyWrongPartitionHashKey(int position, int fieldNum) {
+		final TypeComparator<Record> comparator = new RecordComparatorFactory(
+			new int[] {position}, new Class[] {IntValue.class}).createComparator();
+		final ChannelSelector<SerializationDelegate<Record>> selector = new OutputEmitter<>(
+			ShipStrategyType.PARTITION_HASH, comparator);
+		final SerializationDelegate<Record> delegate = new SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+		Record record = new Record(2);
+		record.setField(fieldNum, new IntValue(1));
+		delegate.setInstance(record);
+
+		try {
+			selector.selectChannels(delegate, 100);
+		} catch (NullKeyFieldException re) {
+			Assert.assertEquals(position, re.getFieldNumber());
+			return true;
+		}
+		return false;
+	}
+
+	private int[] getSelectedChannelsHitCount(
+			ShipStrategyType shipStrategyType,
+			int numRecords,
+			int numChannels,
+			Enum recordType) {
+		final TypeComparator<Record> comparator = new RecordComparatorFactory(
+			new int[] {0}, new Class[] {recordType == RecordType.INTEGER ? IntValue.class : StringValue.class}).createComparator();
+		final ChannelSelector<SerializationDelegate<Record>> selector = new OutputEmitter<>(shipStrategyType, comparator);
+		final SerializationDelegate<Record> delegate = new SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+		return getSelectedChannelsHitCount(selector, delegate, recordType, numRecords, numChannels);
+	}
+
+	private int[] getSelectedChannelsHitCount(
+			ChannelSelector<SerializationDelegate<Record>> selector,
+			SerializationDelegate<Record> delegate,
+			Enum recordType,
+			int numRecords,
+			int numChannels) {
+		int[] hits = new int[numChannels];
+		Value value;
+		for (int i = 0; i < numRecords; i++) {
+			if (recordType == RecordType.INTEGER) {
+				value = new IntValue(i);
+			} else {
+				value = new StringValue(i + "");
+			}
+			Record record = new Record(value);
+			delegate.setInstance(record);
+
+			int[] channels = selector.selectChannels(delegate, hits.length);
+			for (int channel : channels) {
+				hits[channel]++;
+			}
+		}
+		return hits;
+	}
+
 	private static class TestIntComparator extends TypeComparator<Integer> {
 		private TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
 
@@ -517,4 +382,9 @@ public int extractKeys(Object record, Object[] target, int index) {
 			return comparators;
 		}
 	}
+
+	private enum RecordType {
+		STRING,
+		INTEGER
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services