You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/22 03:09:38 UTC

[GitHub] [flink] tzulitai commented on a change in pull request #12850: [FLINK-18483][kinesis] Test coverage improvements for FlinkKinesisConsumer/ShardConsumer

tzulitai commented on a change in pull request #12850:
URL: https://github.com/apache/flink/pull/12850#discussion_r458507237



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
##########
@@ -51,148 +61,123 @@
 
 	@Test
 	public void testMetricsReporting() {
-		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
-
-		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
-		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(
-				KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
-				fakeToBeConsumedShard,
-				new SequenceNumber("fakeStartingState")));
+		KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(500, 5, 500);
 
-		TestSourceContext<String> sourceContext = new TestSourceContext<>();
-
-		KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
-			new SimpleStringSchema());
-		TestableKinesisDataFetcher<String> fetcher =
-			new TestableKinesisDataFetcher<>(
-				Collections.singletonList("fakeStream"),
-				sourceContext,
-				new Properties(),
-				deserializationSchema,
-				10,
-				2,
-				new AtomicReference<>(),
-				subscribedShardsStateUnderTest,
-				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
-				Mockito.mock(KinesisProxyInterface.class));
+		ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
+		assertEquals(500, metrics.getMillisBehindLatest());
+		assertEquals(10000, metrics.getMaxNumberOfRecordsPerFetch());
+	}
 
-		ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
-		long millisBehindLatest = 500L;
-		new ShardConsumer<>(
-			fetcher,
-			0,
-			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
-			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
-			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, millisBehindLatest),
-			shardMetricsReporter,
-			deserializationSchema)
-			.run();
+	@Test
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceNumber() throws Exception {
+		KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L));
 
-		// the millisBehindLatest metric should have been reported
-		assertEquals(millisBehindLatest, shardMetricsReporter.getMillisBehindLatest());
+		assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
+		verify(kinesis).getShardIterator(any(), eq("AFTER_SEQUENCE_NUMBER"), eq("fakeStartingState"));
 	}
 
 	@Test
-	public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
-		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelTimestamp() throws Exception {
+		String format = "yyyy-MM-dd'T'HH:mm";
+		String timestamp = "2020-07-02T09:14";
+		Date expectedTimestamp = new SimpleDateFormat(format).parse(timestamp);
 
-		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
-		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
-				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+		Properties consumerProperties = new Properties();
+		consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+		consumerProperties.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format);
+		SequenceNumber sequenceNumber = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();
 
-		TestSourceContext<String> sourceContext = new TestSourceContext<>();
+		KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(10, 1, 0));
 
-		KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
-			new SimpleStringSchema());
-		TestableKinesisDataFetcher<String> fetcher =
-			new TestableKinesisDataFetcher<>(
-				Collections.singletonList("fakeStream"),
-				sourceContext,
-				new Properties(),
-				deserializationSchema,
-				10,
-				2,
-				new AtomicReference<>(),
-				subscribedShardsStateUnderTest,
-				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
-				Mockito.mock(KinesisProxyInterface.class));
+		assertNumberOfMessagesReceivedFromKinesis(10, kinesis, sequenceNumber, consumerProperties);
+		verify(kinesis).getShardIterator(any(), eq("AT_TIMESTAMP"), eq(expectedTimestamp));
+	}
 
-		int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
-		new ShardConsumer<>(
-			fetcher,
-			shardIndex,
-			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
-			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
-			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
-			new ShardMetricsReporter(),
-			deserializationSchema)
-			.run();
+	@Test
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelEarliest() throws Exception {
+		SequenceNumber sequenceNumber = SENTINEL_EARLIEST_SEQUENCE_NUM.get();
 
-		assertEquals(1000, sourceContext.getCollectedOutputs().size());
-		assertEquals(
-			SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
-			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
+		KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(50, 2, 0));
+
+		assertNumberOfMessagesReceivedFromKinesis(50, kinesis, sequenceNumber);
+		verify(kinesis).getShardIterator(any(), eq("TRIM_HORIZON"), eq(null));
 	}
 
 	@Test
 	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
-		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
+		KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7, 500L);
 
-		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
-		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
-				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+		// Get a total of 1000 records with 9 getRecords() calls,
+		// and the 7th getRecords() call will encounter an unexpected expired shard iterator
+		assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
+	}
 
-		TestSourceContext<String> sourceContext = new TestSourceContext<>();
+	@Test
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
+		Properties consumerProperties = new Properties();
+		consumerProperties.setProperty("flink.shard.adaptivereads", "true");

Review comment:
       Is there a constant for this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org