You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/11 09:43:43 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #12018: [FLINK-17307] Add collector to deserialize in KafkaDeserializationSchema

aljoscha commented on a change in pull request #12018:
URL: https://github.com/apache/flink/pull/12018#discussion_r422914601



##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
##########
@@ -228,4 +226,33 @@ protected void doCommitInternalOffsetsToKafka(
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
 		consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
 	}
+
+	private class KafkaCollector implements Collector<T> {
+		private final Queue<T> records = new ArrayDeque<>();
+
+		private boolean endOfStreamSignalled = false;
+
+		@Override
+		public void collect(T record) {
+			// do not emit subsequent elements if the end of the stream reached

Review comment:
       So we're not emitting all the records of the batch after one of them signals "end of stream"? I recall that an initial version still emitted all records.

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
##########
@@ -108,13 +112,13 @@ public void testSkipCorruptedRecord() throws Exception {
 
 		final KafkaTopicPartitionState<Object> partitionStateHolder = fetcher.subscribedPartitionStates().get(0);
 
-		fetcher.emitRecord(1L, partitionStateHolder, 1L);
-		fetcher.emitRecord(2L, partitionStateHolder, 2L);
+		emitRecord(fetcher, 1L, partitionStateHolder, 1L);

Review comment:
       The changes in this file are purely an orthogonal refactoring, right? Could you put these in a separate commit, along with the removal of the `emitRecord()` method on the fetcher that is only used in tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org