You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/06/29 00:23:24 UTC

[GitHub] [gobblin] sv2000 commented on a change in pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

sv2000 commented on a change in pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323#discussion_r660196262



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -350,20 +351,33 @@ public S getSchema() {
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {

Review comment:
       Each call to consume() returns an iterator over a new batch of records. Let's say the returned records are R1, R2, R3, and further, only R2 is null-valued. If we use a single while loop, we would end up calling consume() the moment we encounter R2, resulting in R3 being skipped. With the current implementation, R2 will be skipped because it is null and the next iteration of the while loop will correctly return R3, causing the outer while loop to be exited.
   
   In theory, we can make it work with a single while loop and if conditions inside the while loop to handle the null-valued records as a special case. I am not sure if it would add more clarity than what the current implementation does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org