You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/03/15 06:00:45 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-689] catch unchecked exceptions in KafkaSource

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8965666  [GOBBLIN-689] catch unchecked exceptions in KafkaSource
8965666 is described below

commit 89656669ab8cbc1df12624ac189d8d98510b056c
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Mar 14 23:00:39 2019 -0700

    [GOBBLIN-689] catch unchecked exceptions in KafkaSource
    
    Closes #2561 from arjun4084346/kafkaDataLossFix
---
 .../apache/gobblin/source/extractor/extract/kafka/KafkaSource.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 1fb1acc..3e5dad6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -354,9 +354,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
     List<WorkUnit> workUnits = Lists.newArrayList();
     for (KafkaPartition partition : topic.getPartitions()) {
       WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
-      this.partitionsToBeProcessed.add(partition);
       if (workUnit != null) {
-
         // For disqualified topics, for each of its workunits set the high watermark to be the same
         // as the low watermark, so that it will be skipped.
         if (!topicQualified) {
@@ -365,6 +363,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
         workUnits.add(workUnit);
       }
     }
+    this.partitionsToBeProcessed.addAll(topic.getPartitions());
     return workUnits;
   }
 
@@ -393,8 +392,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
       offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
       offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
       offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
-    } catch (KafkaOffsetRetrievalFailureException e) {
+    } catch (Throwable t) {
       failedToGetKafkaOffsets = true;
+      LOG.error("Caught error in creating work unit for {}", partition, t);
     }
 
     long previousOffset = 0;