You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/30 18:13:12 UTC
[gobblin] branch master updated: [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (#3570)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 71da34b5d [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (#3570)
71da34b5d is described below
commit 71da34b5d42e7b2c159ef241368b644a08de875d
Author: umustafi <um...@gmail.com>
AuthorDate: Fri Sep 30 11:13:04 2022 -0700
[GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (#3570)
* [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initialization configurable
* allow SpecStoreChangeMonitor to initialize partition and offsets
* create unique group_id for each host's consumer client monitor
* remove extra call to helper function
* rename methods and add comment
* move topic assignment and offset logic here
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
.../gobblin/runtime/kafka/HighLevelConsumer.java | 35 +++++++++++-------
.../gobblin/runtime/metrics/RuntimeMetrics.java | 2 +-
.../service/monitoring/SpecStoreChangeMonitor.java | 42 +++++++++++++++++++++-
3 files changed, 64 insertions(+), 15 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index c9838e05b..1ce525562 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -123,19 +123,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
this.numThreads = numThreads;
this.config = config.withFallback(FALLBACK);
this.gobblinKafkaConsumerClient = createConsumerClient(this.config);
- // On Partition rebalance, commit exisiting offsets and reset.
- this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
- copyAndCommit();
- partitionOffsetsToCommit.clear();
- }
-
- @Override
- public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
- // No op
- }
- });
+ assignTopicPartitions();
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread")));
this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d")));
this.queues = new LinkedBlockingQueue[numThreads];
@@ -164,6 +152,27 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
}
}
+ /*
+ The default implementation of this method subscribes to the given topic and uses the default Kafka logic to split
+ partitions of the topic among all consumers in the group and start consuming from the last committed offset for the
+ partition. Override this method to assign partitions and initialize offsets using different logic.
+ */
+ protected void assignTopicPartitions() {
+ // On Partition rebalance, commit existing offsets and reset.
+ this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
+ copyAndCommit();
+ partitionOffsetsToCommit.clear();
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
+ // No op
+ }
+ });
+ }
+
/**
* Called once on {@link #startUp()} to start metrics.
*/
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 410cf98b6..d714468e4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -39,7 +39,7 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.successful.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs";
- public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errorss";
+ public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors";
// Metadata keys
public static final String TOPIC = "topic";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 3cf8fd376..b9563141e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -19,7 +19,14 @@ package org.apache.gobblin.service.monitoring;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import org.apache.commons.text.StringEscapeUtils;
@@ -27,8 +34,10 @@ import com.codahale.metrics.Meter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
@@ -40,6 +49,10 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
/**
@@ -74,7 +87,34 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer {
protected GobblinServiceJobScheduler scheduler;
public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
- super(topic, config, numThreads);
+ // Differentiate group id for each host
+ super(topic, config.withValue(GROUP_ID_KEY,
+ ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())),
+ numThreads);
+ }
+
+ @Override
+ protected void assignTopicPartitions() {
+ // The consumer client will assign itself to all partitions for this topic and consume from its latest offset.
+ List<KafkaTopic> kafkaTopicList = this.getGobblinKafkaConsumerClient().getFilteredTopics(Collections.EMPTY_LIST,
+ Lists.newArrayList(Pattern.compile(this.topic)));
+
+ List<KafkaPartition> kafkaPartitions = new ArrayList();
+ for (KafkaTopic topic : kafkaTopicList) {
+ kafkaPartitions.addAll(topic.getPartitions());
+ }
+
+ Map<KafkaPartition, LongWatermark> partitionLongWatermarkMap = new HashMap<>();
+ for (KafkaPartition partition : kafkaPartitions) {
+ try {
+ partitionLongWatermarkMap.put(partition, new LongWatermark(this.getGobblinKafkaConsumerClient().getLatestOffset(partition)));
+ } catch (KafkaOffsetRetrievalFailureException e) {
+ log.warn("Failed to retrieve latest Kafka offset, consuming from beginning for partition {} due to {}",
+ partition, e);
+ partitionLongWatermarkMap.put(partition, new LongWatermark(0L));
+ }
+ }
+ this.getGobblinKafkaConsumerClient().assignAndSeek(kafkaPartitions, partitionLongWatermarkMap);
}
@Override