You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/22 08:50:36 UTC

[GitHub] pnowojski commented on a change in pull request #7146: [FLINK-10942][network, test] Deduplicate common codes in OutputEmitterTest

pnowojski commented on a change in pull request #7146: [FLINK-10942][network,test] Deduplicate common codes in OutputEmitterTest
URL: https://github.com/apache/flink/pull/7146#discussion_r235635663
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
 ##########
 @@ -48,404 +49,272 @@
 import java.io.PipedOutputStream;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class OutputEmitterTest {
-	
-	
+
 	@Test
 	public void testPartitionHash() {
 		// Test for IntValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> intComp = new RecordComparatorFactory(new int[] {0}, new Class[] {IntValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe1 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-		final SerializationDelegate<Record> delegate = new SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-		
-		int numChans = 100;
-		int numRecs = 50000;
-		int[] hit = new int[numChans];
-
-		for (int i = 0; i < numRecs; i++) {
-			IntValue k = new IntValue(i);
-			Record rec = new Record(k);
-			
-			delegate.setInstance(rec);
-			
-			int[] chans = oe1.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
-
-		int cnt = 0;
-		for (int aHit : hit) {
-			assertTrue(aHit > 0);
-			cnt += aHit;
-		}
-		assertTrue(cnt == numRecs);
-
+		verifyPartitionHashSelectedChannels(50000, 100, new Either.Left<>(0));
 		// Test for StringValue
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final TypeComparator<Record> stringComp = new RecordComparatorFactory(new int[] {0}, new Class[] {StringValue.class}).createComparator();
-		final ChannelSelector<SerializationDelegate<Record>> oe2 = new OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, stringComp);
-
-		numChans = 100;
-		numRecs = 10000;
-		
-		hit = new int[numChans];
-
-		for (int i = 0; i < numRecs; i++) {
-			StringValue k = new StringValue(i + "");
-			Record rec = new Record(k);
-			delegate.setInstance(rec);
-				
-			int[] chans = oe2.selectChannels(delegate, hit.length);
-			for (int chan : chans) {
-				hit[chan]++;
-			}
-		}
+		verifyPartitionHashSelectedChannels(10000, 100, new Either.Right<>(""));
 
-		cnt = 0;
-		for (int aHit : hit) {
-			assertTrue(aHit > 0);
-			cnt += aHit;
-		}
-		assertTrue(cnt == numRecs);
-		
-		// test hash corner cases
+		// Test hash corner cases
 		final TestIntComparator testIntComp = new TestIntComparator();
-		final ChannelSelector<SerializationDelegate<Integer>> oe3 = new OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
-		final SerializationDelegate<Integer> intDel = new SerializationDelegate<Integer>(new IntSerializer());
-		
-		numChans = 100;
-		
+		final ChannelSelector<SerializationDelegate<Integer>> selector = new OutputEmitter<>(
+			ShipStrategyType.PARTITION_HASH, testIntComp);
+		final SerializationDelegate<Integer> serializationDelegate = new SerializationDelegate<>(new IntSerializer());
+
 		// MinVal hash
 
 Review comment:
   nit: I would drop `// MinVal hash`, `// -1 hash`, `// ... hash` comments here. They don't explain anything more then the `assertPartitionHashSelectedChannels` method name already does.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services