You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/30 18:13:12 UTC

[gobblin] branch master updated: [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (#3570)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 71da34b5d [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (#3570)
71da34b5d is described below

commit 71da34b5d42e7b2c159ef241368b644a08de875d
Author: umustafi <um...@gmail.com>
AuthorDate: Fri Sep 30 11:13:04 2022 -0700

    [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (#3570)
    
    * [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initialization configurable
    
    * allow SpecStoreChangeMonitor to initialize partition and offsets
    * create unique group_id for each host's consumer client monitor
    
    * remove extra call to helper function
    
    * rename methods and add comment
    
    * move topic assignment and offset logic here
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
 .../gobblin/runtime/kafka/HighLevelConsumer.java   | 35 +++++++++++-------
 .../gobblin/runtime/metrics/RuntimeMetrics.java    |  2 +-
 .../service/monitoring/SpecStoreChangeMonitor.java | 42 +++++++++++++++++++++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index c9838e05b..1ce525562 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -123,19 +123,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
     this.numThreads = numThreads;
     this.config = config.withFallback(FALLBACK);
     this.gobblinKafkaConsumerClient = createConsumerClient(this.config);
-    // On Partition rebalance, commit exisiting offsets and reset.
-    this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
-      @Override
-      public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
-        copyAndCommit();
-        partitionOffsetsToCommit.clear();
-      }
-
-      @Override
-      public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
-        // No op
-      }
-    });
+    assignTopicPartitions();
     this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread")));
     this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d")));
     this.queues = new LinkedBlockingQueue[numThreads];
@@ -164,6 +152,27 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
     }
   }
 
+  /*
+  The default implementation of this method subscribes to the given topic and uses the default Kafka logic to split
+  partitions of the topic among all consumers in the group and start consuming from the last committed offset for the
+  partition. Override this method to assign partitions and initialize offsets using different logic.
+   */
+  protected void assignTopicPartitions() {
+    // On Partition rebalance, commit existing offsets and reset.
+    this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
+      @Override
+      public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
+        copyAndCommit();
+        partitionOffsetsToCommit.clear();
+      }
+
+      @Override
+      public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
+        // No op
+      }
+    });
+  }
+
   /**
    * Called once on {@link #startUp()} to start metrics.
    */
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 410cf98b6..d714468e4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -39,7 +39,7 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.successful.added.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errorss";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors";
 
   // Metadata keys
   public static final String TOPIC = "topic";
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 3cf8fd376..b9563141e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -19,7 +19,14 @@ package org.apache.gobblin.service.monitoring;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import org.apache.commons.text.StringEscapeUtils;
 
@@ -27,8 +34,10 @@ import com.codahale.metrics.Meter;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -40,6 +49,10 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
 
 
 /**
@@ -74,7 +87,34 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer {
   protected GobblinServiceJobScheduler scheduler;
 
   public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
-    super(topic, config, numThreads);
+    // Differentiate group id for each host
+    super(topic, config.withValue(GROUP_ID_KEY,
+        ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())),
+        numThreads);
+  }
+
+  @Override
+  protected void assignTopicPartitions() {
+    // The consumer client will assign itself to all partitions for this topic and consume from its latest offset.
+    List<KafkaTopic> kafkaTopicList = this.getGobblinKafkaConsumerClient().getFilteredTopics(Collections.EMPTY_LIST,
+        Lists.newArrayList(Pattern.compile(this.topic)));
+
+    List<KafkaPartition> kafkaPartitions = new ArrayList();
+    for (KafkaTopic topic : kafkaTopicList) {
+      kafkaPartitions.addAll(topic.getPartitions());
+    }
+
+    Map<KafkaPartition, LongWatermark> partitionLongWatermarkMap = new HashMap<>();
+    for (KafkaPartition partition : kafkaPartitions) {
+      try {
+        partitionLongWatermarkMap.put(partition, new LongWatermark(this.getGobblinKafkaConsumerClient().getLatestOffset(partition)));
+      } catch (KafkaOffsetRetrievalFailureException e) {
+        log.warn("Failed to retrieve latest Kafka offset, consuming from beginning for partition {} due to {}",
+            partition, e);
+        partitionLongWatermarkMap.put(partition, new LongWatermark(0L));
+      }
+    }
+    this.getGobblinKafkaConsumerClient().assignAndSeek(kafkaPartitions, partitionLongWatermarkMap);
   }
 
   @Override