You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/06/28 22:49:45 UTC

[GitHub] [gobblin] autumnust commented on a change in pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

autumnust commented on a change in pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323#discussion_r660164349



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -350,20 +351,33 @@ public S getSchema() {
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {

Review comment:
       Why do we need this additional loop given there's already a `while` to check if `messageIterator.hasNext()` returns true?  Should it be enough to examine `.next()` each time and verify if that's actually a `null` value? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org