You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/06/25 18:00:49 UTC

[GitHub] [gobblin] sv2000 opened a new pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

sv2000 opened a new pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1483
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   Currently, null-valued ConsumerRecords returned by the Kafka consumer are passed down the construct chain and can result in irrecoverable failures. Null valued records should be filtered out early in the pipeline so as to avoid such errors.
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   Added unit tests
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] sv2000 commented on a change in pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323#discussion_r660196262



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -350,20 +351,33 @@ public S getSchema() {
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {

Review comment:
       Each call to consume() returns an iterator over a new batch of records. Let's say the returned records are R1, R2, R3, and further, only R2 is null-valued. If we use a single while loop, we would end up calling consume() the moment we encounter R2, resulting in R3 being skipped. With the current implementation, R2 will be skipped because it is null and the next iteration of the while loop will correctly return R3, causing the outer while loop to be exited.
   
   In theory, we can make it work with a single while loop and if conditions inside the while loop to handle the null-valued records as a special case. I am not sure if it would add more clarity than what the current implementation does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] autumnust commented on a change in pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323#discussion_r660164349



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -350,20 +351,33 @@ public S getSchema() {
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {

Review comment:
       Why do we need this additional loop given there's already a `while` to check if `messageIterator.hasNext()` returns true?  Should it be enough to examine `.next()` each time and verify if that's actually a `null` value? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] asfgit closed pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] sv2000 commented on a change in pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323#discussion_r660196262



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -350,20 +351,33 @@ public S getSchema() {
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {

Review comment:
       Each call to consume() returns an iterator over a new batch of records. Let's say the returned records are R1, R2, R3, and further, only R2 is null-valued. If we use a single while loop, we would end up calling consume() the moment we encounter R2, resulting in R3 being skipped. With the current implementation, R2 will be skipped because it is null and the next iteration of the while loop will correctly return R3, causing the outer while loop to be exited.
   
   In theory, we can make it work with a single while loop and if conditions inside the while loop to handle the null-valued records as a special case. I am not sure if it would add more clarity than what the current implementation does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] autumnust commented on a change in pull request #3323: GOBBLIN-1483: Handle null valued ConsumerRecords in Kafka Streaming Extractor

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3323:
URL: https://github.com/apache/gobblin/pull/3323#discussion_r660164349



##########
File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
##########
@@ -350,20 +351,33 @@ public S getSchema() {
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {

Review comment:
       Why do we need this additional loop given there's already a `while` to check if `messageIterator.hasNext()` returns true?  Should it be enough to examine `.next()` each time and verify if that's actually a `null` value? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org