You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/19 21:35:22 UTC
[gobblin] branch master updated: [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (#3693)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 85b0a1e57 [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (#3693)
85b0a1e57 is described below
commit 85b0a1e57402377f2baa35b9f7d0ca9a36d44a1a
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Fri May 19 14:35:15 2023 -0700
[GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (#3693)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1830]Improving Container Transition Tracking in Streaming Data Ingestion
* emmit event with a different name
* remove unnecessary log
---------
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../extract/kafka/KafkaExtractorStatsTracker.java | 17 +++++++++++++++++
.../extract/kafka/KafkaStreamingExtractor.java | 8 ++++++++
2 files changed, 25 insertions(+)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index b1bf19788..8ee8e5e8c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.util.TaskEventMetadataUtils;
@@ -56,6 +57,7 @@ public class KafkaExtractorStatsTracker {
private static final String EMPTY_STRING = "";
private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
+ private static final String KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME = "KafkaExtractorContainerTransitionEvent";
private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
private static final String LOW_WATERMARK = "lowWatermark";
private static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
@@ -497,6 +499,21 @@ public class KafkaExtractorStatsTracker {
}
}
+ /**
+ * Emit Tracking events reporting the topic partition information this extractor handled to be consumed by a monitoring application.
+ * @param context the current {@link MetricContext}
+ */
+ public void submitEventToIndicateContainerTransition(MetricContext context) {
+ for (int i = 0; i < this.partitions.size(); i++) {
+ KafkaPartition partitionKey = this.partitions.get(i);
+ GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder(KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME, GOBBLIN_KAFKA_NAMESPACE);
+ gobblinEventBuilder.addMetadata(TOPIC, partitionKey.getTopicName());
+ gobblinEventBuilder.addMetadata(PARTITION, Integer.toString(partitionKey.getId()));
+ gobblinEventBuilder.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState, KAFKA_EXTRACTOR_CONTAINER_TRANSITION_EVENT_NAME));
+ EventSubmitter.submit(context, gobblinEventBuilder);
+ }
+ }
+
/**
* A helper function to merge tags for KafkaPartition. Separate into a package-private method for ease of testing.
*/
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 87658e09c..3fa4d4c28 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -246,6 +246,8 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
state.getPropAsLong(KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY,
DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES) * 60 * 1000;
resetExtractorStatsAndWatermarks(true);
+ //Even though we haven't start ingesting yet, emit event to indicate the container transition.
+ submitEventToIndicateContainerTransition();
//Schedule a thread for reporting Kafka consumer metrics
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
@@ -271,6 +273,12 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
}
+ private void submitEventToIndicateContainerTransition() {
+ if (this.isInstrumentationEnabled()) {
+ this.statsTracker.submitEventToIndicateContainerTransition(getMetricContext());
+ }
+ }
+
private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<KafkaPartition> topicPartitions) {
List<String> topicPartitionStrings =
topicPartitions.stream().map(topicPartition -> topicPartition.toString()).collect(Collectors.toList());