You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/11 11:23:44 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #12018: [FLINK-17307] Add collector to deserialize in KafkaDeserializationSchema

dawidwys commented on a change in pull request #12018:
URL: https://github.com/apache/flink/pull/12018#discussion_r422970769



##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
##########
@@ -228,4 +226,33 @@ protected void doCommitInternalOffsetsToKafka(
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
 		consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
 	}
+
+	private class KafkaCollector implements Collector<T> {
+		private final Queue<T> records = new ArrayDeque<>();
+
+		private boolean endOfStreamSignalled = false;
+
+		@Override
+		public void collect(T record) {
+			// do not emit subsequent elements if the end of the stream reached

Review comment:
       Actually, that is the behaviour I had from the very beginning. Do you think it makes more sense to emit all but the end of the stream record?
   
   Side note. I find this method very confusing. I just realized that there is no cross partition alignment on this method. The whole task will be brought down if any of the assigned partitions signals the end of stream, irrespective of the state of the remaining partitions. Honestly I'd be in favour of dropping this method at some point in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org