You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/03/15 06:00:45 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-689] catch
unchecked exceptions in KafkaSource
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8965666 [GOBBLIN-689] catch unchecked exceptions in KafkaSource
8965666 is described below
commit 89656669ab8cbc1df12624ac189d8d98510b056c
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Mar 14 23:00:39 2019 -0700
[GOBBLIN-689] catch unchecked exceptions in KafkaSource
Closes #2561 from arjun4084346/kafkaDataLossFix
---
.../apache/gobblin/source/extractor/extract/kafka/KafkaSource.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 1fb1acc..3e5dad6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -354,9 +354,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
List<WorkUnit> workUnits = Lists.newArrayList();
for (KafkaPartition partition : topic.getPartitions()) {
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
- this.partitionsToBeProcessed.add(partition);
if (workUnit != null) {
-
// For disqualified topics, for each of its workunits set the high watermark to be the same
// as the low watermark, so that it will be skipped.
if (!topicQualified) {
@@ -365,6 +363,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
workUnits.add(workUnit);
}
}
+ this.partitionsToBeProcessed.addAll(topic.getPartitions());
return workUnits;
}
@@ -393,8 +392,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
- } catch (KafkaOffsetRetrievalFailureException e) {
+ } catch (Throwable t) {
failedToGetKafkaOffsets = true;
+ LOG.error("Caught error in creating work unit for {}", partition, t);
}
long previousOffset = 0;