You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/19 21:35:22 UTC

[gobblin] branch master updated: [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (#3693)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 85b0a1e57 [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (#3693)
85b0a1e57 is described below

commit 85b0a1e57402377f2baa35b9f7d0ca9a36d44a1a
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Fri May 19 14:35:15 2023 -0700

    [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (#3693)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1830]Improving Container Transition Tracking in Streaming Data Ingestion
    
    * emmit event with a different name
    
    * remove unnecessary log
    
    ---------
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../extract/kafka/KafkaExtractorStatsTracker.java       | 17 +++++++++++++++++
 .../extract/kafka/KafkaStreamingExtractor.java          |  8 ++++++++
 2 files changed, 25 insertions(+)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index b1bf19788..8ee8e5e8c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
 import org.apache.gobblin.util.TaskEventMetadataUtils;
 
@@ -56,6 +57,7 @@ public class KafkaExtractorStatsTracker {
 
   private static final String EMPTY_STRING = "";
   private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
+  private static final String KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME = "KafkaExtractorContainerTransitionEvent";
   private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
   private static final String LOW_WATERMARK = "lowWatermark";
   private static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
@@ -497,6 +499,21 @@ public class KafkaExtractorStatsTracker {
     }
   }
 
+  /**
+   * Emit Tracking events reporting the topic partition information this extractor handled to be consumed by a monitoring application.
+   * @param context the current {@link MetricContext}
+   */
+  public void submitEventToIndicateContainerTransition(MetricContext context) {
+    for (int i = 0; i < this.partitions.size(); i++) {
+      KafkaPartition partitionKey = this.partitions.get(i);
+      GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder(KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME, GOBBLIN_KAFKA_NAMESPACE);
+      gobblinEventBuilder.addMetadata(TOPIC, partitionKey.getTopicName());
+      gobblinEventBuilder.addMetadata(PARTITION, Integer.toString(partitionKey.getId()));
+      gobblinEventBuilder.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState, KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME));
+      EventSubmitter.submit(context, gobblinEventBuilder);
+    }
+  }
+
   /**
    * A helper function to merge tags for KafkaPartition. Separate into a package-private method for ease of testing.
    */
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 87658e09c..3fa4d4c28 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -246,6 +246,8 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
         state.getPropAsLong(KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY,
             DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES) * 60 * 1000;
     resetExtractorStatsAndWatermarks(true);
+    //Even though we haven't start ingesting yet, emit event to indicate the container transition.
+    submitEventToIndicateContainerTransition();
 
     //Schedule a thread for reporting Kafka consumer metrics
     this.scheduledExecutorService.scheduleAtFixedRate(() -> {
@@ -271,6 +273,12 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
         this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
   }
 
+  private void submitEventToIndicateContainerTransition() {
+    if (this.isInstrumentationEnabled()) {
+      this.statsTracker.submitEventToIndicateContainerTransition(getMetricContext());
+    }
+  }
+
   private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<KafkaPartition> topicPartitions) {
     List<String> topicPartitionStrings =
         topicPartitions.stream().map(topicPartition -> topicPartition.toString()).collect(Collectors.toList());