You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/06 21:31:56 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-945] Refactor Kafka extractor statistics tracking to allow co…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8c7ff20 [GOBBLIN-945] Refactor Kafka extractor statistics tracking to allow co…
8c7ff20 is described below
commit 8c7ff20666f6f059bc1bd41e2b8a32e13d298f34
Author: sv2000 <su...@gmail.com>
AuthorDate: Wed Nov 6 13:31:49 2019 -0800
[GOBBLIN-945] Refactor Kafka extractor statistics tracking to allow co…
Closes #2795 from sv2000/statsTracker
---
.../kafka/client/Kafka08ConsumerClient.java | 18 +-
.../kafka/client/GobblinKafkaConsumerClient.java | 20 ++
.../extract/kafka/KafkaAvroExtractor.java | 6 +-
.../extractor/extract/kafka/KafkaExtractor.java | 265 +++---------------
.../extract/kafka/KafkaExtractorStatsTracker.java | 307 +++++++++++++++++++++
.../extractor/extract/kafka/KafkaPartition.java | 13 +-
.../extractor/extract/kafka/KafkaSource.java | 18 +-
.../kafka/KafkaExtractorStatsTrackerTest.java | 137 +++++++++
8 files changed, 533 insertions(+), 251 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
index df08ed9..2ed5b70 100644
--- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/client/Kafka08ConsumerClient.java
@@ -24,7 +24,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Pattern;
+
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.net.HostAndPort;
+import com.typesafe.config.Config;
import kafka.api.PartitionFetchInfo;
import kafka.api.PartitionOffsetRequestInfo;
@@ -41,19 +48,10 @@ import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import lombok.extern.slf4j.Slf4j;
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.net.HostAndPort;
-import com.typesafe.config.Config;
-
import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.DatasetFilterUtils;
/**
* A {@link GobblinKafkaConsumerClient} that uses kafka 08 scala consumer client. All the code has been moved from the
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index f94530b..4026f33 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.kafka.client;
import java.io.Closeable;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -73,6 +74,25 @@ public interface GobblinKafkaConsumerClient extends Closeable {
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException;
/**
+ * Get the latest available offset for a {@link Collection} of {@link KafkaPartition}s. NOTE: The default implementation
+ * is not efficient i.e. it will make a getLatestOffset() call for every {@link KafkaPartition}. Individual implementations
+ * of {@link GobblinKafkaConsumerClient} should override this method to use more advanced APIs of the underlying KafkaConsumer
+ * to retrieve the latest offsets for a collection of partitions.
+ *
+ * @param partitions for which latest offset is retrieved
+ *
+ * @throws KafkaOffsetRetrievalFailureException - If the underlying kafka-client does not support getting latest offset
+ */
+ public default Map<KafkaPartition, Long> getLatestOffsets(Collection<KafkaPartition> partitions)
+ throws KafkaOffsetRetrievalFailureException {
+ Map<KafkaPartition, Long> offsetMap = Maps.newHashMap();
+ for (KafkaPartition partition: partitions) {
+ offsetMap.put(partition, getLatestOffset(partition));
+ }
+ return offsetMap;
+ }
+
+ /**
* API to consume records from kakfa starting from <code>nextOffset</code> till <code>maxOffset</code>.
* If <code>maxOffset</code> is greater than <code>nextOffset</code>, returns a null.
* <code>nextOffset</code>
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java
index 4547cfe..bbe69b5 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.source.extractor.extract.kafka;
import java.io.IOException;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
@@ -31,6 +29,8 @@ import org.apache.avro.io.Decoder;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
@@ -116,7 +116,7 @@ public abstract class KafkaAvroExtractor<K> extends KafkaExtractor<Schema, Gener
record = convertRecord(record);
return record;
} catch (IOException e) {
- log.error(String.format("Error during decoding record for partition %s: ", this.getCurrentPartition()));
+ log.error(String.format("Error during decoding record for partition %s: ", getCurrentPartition()));
throw e;
}
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 3ca24bb..5ea38b7 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -20,19 +20,15 @@ package org.apache.gobblin.source.extractor.extract.kafka;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.gobblin.runtime.JobShutdownException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+
+import lombok.Getter;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -43,7 +39,7 @@ import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.JobShutdownException;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
@@ -61,25 +57,19 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaExtractor.class);
+
+ private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
+ private Iterator<KafkaConsumerRecord> messageIterator = null;
+ @Getter
+ private int currentPartitionIdx = INITIAL_PARTITION_IDX;
+ @Getter
+ private long readStartTime;
+
protected static final int INITIAL_PARTITION_IDX = -1;
- protected static final Long MAX_LOG_DECODING_ERRORS = 5L;
- // Constants for event submission
- public static final String TOPIC = "topic";
- public static final String PARTITION = "partition";
- public static final String LOW_WATERMARK = "lowWatermark";
- public static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
- public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
- public static final String ELAPSED_TIME = "elapsedTime";
- public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
- public static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
- public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
- public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
- public static final String READ_RECORD_TIME = "readRecordTime";
- public static final String DECODE_RECORD_TIME = "decodeRecordTime";
- public static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
- public static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
- public static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
+ protected static final Long MAX_LOG_DECODING_ERRORS = 5L;
protected final WorkUnitState workUnitState;
protected final String topicName;
@@ -87,35 +77,11 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
protected final MultiLongWatermark lowWatermark;
protected final MultiLongWatermark highWatermark;
protected final MultiLongWatermark nextWatermark;
+ protected final KafkaExtractorStatsTracker statsTracker;
protected final GobblinKafkaConsumerClient kafkaConsumerClient;
- private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
-
- protected final Map<KafkaPartition, Long> decodingErrorCount;
- private final Map<KafkaPartition, Double> avgMillisPerRecord;
- private final Map<KafkaPartition, Long> avgRecordSizes;
- private final Map<KafkaPartition, Long> elapsedTime;
- private final Map<KafkaPartition, Long> processedRecordCount;
- private final Map<KafkaPartition, Long> partitionTotalSize;
- private final Map<KafkaPartition, Long> decodeRecordTime;
- private final Map<KafkaPartition, Long> fetchMessageBufferTime;
- private final Map<KafkaPartition, Long> readRecordTime;
- private final Map<KafkaPartition, Long> startFetchEpochTime;
- private final Map<KafkaPartition, Long> stopFetchEpochTime;
-
- private final Set<Integer> errorPartitions;
- private int undecodableMessageCount = 0;
- private Iterator<KafkaConsumerRecord> messageIterator = null;
- private int currentPartitionIdx = INITIAL_PARTITION_IDX;
- private long currentPartitionRecordCount = 0;
- private long currentPartitionTotalSize = 0;
- private long currentPartitionDecodeRecordTime = 0;
- private long currentPartitionFetchMessageBufferTime = 0;
- private long currentPartitionReadRecordTime = 0;
protected D currentPartitionLastSuccessfulRecord = null;
- private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
-
public KafkaExtractor(WorkUnitState state) {
super(state);
this.workUnitState = state;
@@ -135,20 +101,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
-
- this.decodingErrorCount = Maps.newHashMap();
- this.avgMillisPerRecord = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.avgRecordSizes = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.elapsedTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.processedRecordCount = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.partitionTotalSize = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.decodeRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.fetchMessageBufferTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.readRecordTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.startFetchEpochTime = Maps.newHashMapWithExpectedSize(this.partitions.size());
- this.stopFetchEpochTime= Maps.newHashMapWithExpectedSize(this.partitions.size());
-
- this.errorPartitions = Sets.newHashSet();
+ this.statsTracker = new KafkaExtractorStatsTracker(state,partitions);
// The actual high watermark starts with the low watermark
this.workUnitState.setActualHighWatermark(this.lowWatermark);
@@ -161,6 +114,13 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
return tags;
}
+ protected KafkaPartition getCurrentPartition() {
+ Preconditions.checkElementIndex(this.currentPartitionIdx, this.partitions.size(),
+ "KafkaExtractor has finished extracting all partitions. There's no current partition.");
+ return this.partitions.get(this.currentPartitionIdx);
+ }
+
+
/**
* Return the next decodable record from the current partition. If the current partition has no more
* decodable record, move on to the next partition. If all partitions have been processed, return null.
@@ -172,7 +132,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
return null;
}
- long readStartTime = System.nanoTime();
+ this.readStartTime = System.nanoTime();
while (!allPartitionsFinished()) {
if (currentPartitionFinished()) {
@@ -183,7 +143,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
try {
long fetchStartTime = System.nanoTime();
this.messageIterator = fetchNextMessageBuffer();
- this.currentPartitionFetchMessageBufferTime += System.nanoTime() - fetchStartTime;
+ this.statsTracker.onFetchNextMessageBuffer(this.currentPartitionIdx, fetchStartTime);
} catch (Exception e) {
LOG.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.",
getCurrentPartition()), e);
@@ -216,25 +176,18 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
D record = decodeKafkaMessage(nextValidMessage);
- this.currentPartitionDecodeRecordTime += System.nanoTime() - decodeStartTime;
- this.currentPartitionRecordCount++;
- this.currentPartitionTotalSize += nextValidMessage.getValueSizeInBytes();
- this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
+ this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, readStartTime, decodeStartTime, nextValidMessage.getValueSizeInBytes());
this.currentPartitionLastSuccessfulRecord = record;
return record;
} catch (Throwable t) {
- this.errorPartitions.add(this.currentPartitionIdx);
- this.undecodableMessageCount++;
+ statsTracker.onUndecodeableRecord(this.currentPartitionIdx);
if (shouldLogError()) {
LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), t);
}
- incrementErrorCount();
}
}
}
LOG.info("Finished pulling topic " + this.topicName);
-
- this.currentPartitionReadRecordTime += System.nanoTime() - readStartTime;
return null;
}
@@ -275,7 +228,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
if (this.currentPartitionIdx == INITIAL_PARTITION_IDX) {
return true;
} else if (this.nextWatermark.get(this.currentPartitionIdx) >= this.highWatermark.get(this.currentPartitionIdx)) {
- LOG.info("Finished pulling partition " + this.getCurrentPartition());
+ LOG.info("Finished pulling partition " + getCurrentPartition());
return true;
} else {
return false;
@@ -291,60 +244,33 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
LOG.info("Pulling topic " + this.topicName);
this.currentPartitionIdx = 0;
} else {
- updateStatisticsForCurrentPartition();
+ this.statsTracker.updateStatisticsForCurrentPartition(currentPartitionIdx, readStartTime, getLastSuccessfulRecordHeaderTimestamp());
this.currentPartitionIdx++;
- this.currentPartitionRecordCount = 0;
- this.currentPartitionTotalSize = 0;
- this.currentPartitionDecodeRecordTime = 0;
- this.currentPartitionFetchMessageBufferTime = 0;
- this.currentPartitionReadRecordTime = 0;
this.currentPartitionLastSuccessfulRecord = null;
}
this.messageIterator = null;
if (this.currentPartitionIdx < this.partitions.size()) {
- LOG.info(String.format("Pulling partition %s from offset %d to %d, range=%d", this.getCurrentPartition(),
+ LOG.info(String.format("Pulling partition %s from offset %d to %d, range=%d", getCurrentPartition(),
this.nextWatermark.get(this.currentPartitionIdx), this.highWatermark.get(this.currentPartitionIdx),
this.highWatermark.get(this.currentPartitionIdx) - this.nextWatermark.get(this.currentPartitionIdx)));
switchMetricContextToCurrentPartition();
}
if (!allPartitionsFinished()) {
- this.startFetchEpochTime.put(this.getCurrentPartition(), System.currentTimeMillis());
+ this.statsTracker.resetStartFetchEpochTime(currentPartitionIdx);
}
}
- protected void updateStatisticsForCurrentPartition() {
- long stopFetchEpochTime = System.currentTimeMillis();
-
- if (!allPartitionsFinished()) {
- this.stopFetchEpochTime.put(this.getCurrentPartition(), stopFetchEpochTime);
- }
-
- if (this.currentPartitionRecordCount != 0) {
- long currentPartitionFetchDuration =
- stopFetchEpochTime - this.startFetchEpochTime.get(this.getCurrentPartition());
- double avgMillisForCurrentPartition =
- (double) currentPartitionFetchDuration / (double) this.currentPartitionRecordCount;
- this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition);
-
- long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount;
- this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize);
-
- this.elapsedTime.put(this.getCurrentPartition(), currentPartitionFetchDuration);
- this.processedRecordCount.put(this.getCurrentPartition(), this.currentPartitionRecordCount);
- this.partitionTotalSize.put(this.getCurrentPartition(), this.currentPartitionTotalSize);
- this.decodeRecordTime.put(this.getCurrentPartition(), this.currentPartitionDecodeRecordTime);
- this.fetchMessageBufferTime.put(this.getCurrentPartition(), this.currentPartitionFetchMessageBufferTime);
- this.readRecordTime.put(this.getCurrentPartition(), this.currentPartitionReadRecordTime);
- }
+ protected long getLastSuccessfulRecordHeaderTimestamp() {
+ return 0;
}
private void switchMetricContextToCurrentPartition() {
if (this.currentPartitionIdx >= this.partitions.size()) {
return;
}
- int currentPartitionId = this.getCurrentPartition().getId();
+ int currentPartitionId = getCurrentPartition().getId();
switchMetricContext(Lists.<Tag<?>> newArrayList(new Tag<>("kafka_partition", currentPartitionId)));
}
@@ -354,22 +280,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
}
private boolean shouldLogError() {
- return !this.decodingErrorCount.containsKey(getCurrentPartition())
- || this.decodingErrorCount.get(getCurrentPartition()) <= MAX_LOG_DECODING_ERRORS;
- }
-
- private void incrementErrorCount() {
- if (this.decodingErrorCount.containsKey(getCurrentPartition())) {
- this.decodingErrorCount.put(getCurrentPartition(), this.decodingErrorCount.get(getCurrentPartition()) + 1);
- } else {
- this.decodingErrorCount.put(getCurrentPartition(), 1L);
- }
- }
-
- protected KafkaPartition getCurrentPartition() {
- Preconditions.checkElementIndex(this.currentPartitionIdx, this.partitions.size(),
- "KafkaExtractor has finished extracting all partitions. There's no current partition.");
- return this.partitions.get(this.currentPartitionIdx);
+ return this.statsTracker.getDecodingErrorCount(this.currentPartitionIdx) <= MAX_LOG_DECODING_ERRORS;
}
protected abstract D decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException;
@@ -392,112 +303,16 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
@Override
public void close() throws IOException {
- if (currentPartitionIdx != INITIAL_PARTITION_IDX) {
- updateStatisticsForCurrentPartition();
+ if (!allPartitionsFinished()) {
+ this.statsTracker.updateStatisticsForCurrentPartition(currentPartitionIdx, readStartTime, getLastSuccessfulRecordHeaderTimestamp());
}
-
- Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
-
// Add error partition count and error message count to workUnitState
- this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.errorPartitions.size());
- this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.undecodableMessageCount);
-
- for (int i = 0; i < this.partitions.size(); i++) {
- LOG.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i),
- this.nextWatermark.get(i), this.highWatermark.get(i)));
- tagsForPartitionsMap.put(this.partitions.get(i), createTagsForPartition(i));
- }
+ this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.statsTracker.getErrorPartitionCount());
+ this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.statsTracker.getUndecodableMessageCount());
this.workUnitState.setActualHighWatermark(this.nextWatermark);
-
if (isInstrumentationEnabled()) {
- for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
- new EventSubmitter.Builder(getMetricContext(), GOBBLIN_KAFKA_NAMESPACE).build()
- .submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, eventTags.getValue());
- }
+ this.statsTracker.emitTrackingEvents(getMetricContext(), this.lowWatermark, this.highWatermark, this.nextWatermark);
}
-
- this.closer.close();
- }
-
- protected Map<String, String> createTagsForPartition(int partitionId) {
- Map<String, String> tagsForPartition = Maps.newHashMap();
- KafkaPartition partition = this.partitions.get(partitionId);
-
- tagsForPartition.put(TOPIC, partition.getTopicName());
- tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
- tagsForPartition.put(LOW_WATERMARK, Long.toString(this.lowWatermark.get(partitionId)));
- tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(this.nextWatermark.get(partitionId)));
-
- // These are used to compute the load factor,
- // gobblin consumption rate relative to the kafka production rate.
- // The gobblin rate is computed as (processed record count/elapsed time)
- // The kafka rate is computed as (expected high watermark - previous latest offset) /
- // (current offset fetch epoch time - previous offset fetch epoch time).
- tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(this.highWatermark.get(partitionId)));
- tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
- Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
- KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, partitionId)));
- tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
- Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
- KafkaSource.OFFSET_FETCH_EPOCH_TIME, partitionId)));
- tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
- Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
- KafkaSource.PREVIOUS_LATEST_OFFSET, partitionId)));
-
- tagsForPartition.put(KafkaSource.PREVIOUS_LOW_WATERMARK,
- Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
- KafkaSource.PREVIOUS_LOW_WATERMARK, partitionId)));
- tagsForPartition.put(KafkaSource.PREVIOUS_HIGH_WATERMARK,
- Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
- KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionId)));
- tagsForPartition.put(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME,
- Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
- KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionId)));
- tagsForPartition.put(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME,
- Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
- KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
-
- tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
- tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
- this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
- Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
- this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
- Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
-
- if (this.processedRecordCount.containsKey(partition)) {
- tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
- tagsForPartition.put(PARTITION_TOTAL_SIZE, Long.toString(this.partitionTotalSize.get(partition)));
- tagsForPartition.put(ELAPSED_TIME, Long.toString(this.elapsedTime.get(partition)));
- tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
- this.decodeRecordTime.get(partition))));
- tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
- this.fetchMessageBufferTime.get(partition))));
- tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(
- this.readRecordTime.get(partition))));
- } else {
- tagsForPartition.put(PROCESSED_RECORD_COUNT, "0");
- tagsForPartition.put(PARTITION_TOTAL_SIZE, "0");
- tagsForPartition.put(ELAPSED_TIME, "0");
- tagsForPartition.put(DECODE_RECORD_TIME, "0");
- tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME, "0");
- tagsForPartition.put(READ_RECORD_TIME, "0");
- }
-
- tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT,
- Long.toString(this.decodingErrorCount.getOrDefault(partition, 0L)));
-
- // Commit avg time to pull a record for each partition
- if (this.avgMillisPerRecord.containsKey(partition)) {
- double avgMillis = this.avgMillisPerRecord.get(partition);
- LOG.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
- KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
- tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
- } else {
- LOG.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
- tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
- }
-
- return tagsForPartition;
}
@Deprecated
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
new file mode 100644
index 0000000..8173418
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+
+/**
+ * A class that tracks KafkaExtractor statistics such as record decode time, #processed records, #undecodeable records etc.
+ *
+ */
+@Slf4j
+public class KafkaExtractorStatsTracker {
+ // Constants for event submission
+ public static final String TOPIC = "topic";
+ public static final String PARTITION = "partition";
+
+ private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka";
+ private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata";
+ private static final String LOW_WATERMARK = "lowWatermark";
+ private static final String ACTUAL_HIGH_WATERMARK = "actualHighWatermark";
+ private static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
+ private static final String ELAPSED_TIME = "elapsedTime";
+ private static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+ private static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
+ private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
+ private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
+ private static final String AVG_RECORD_SIZE = "avgRecordSize";
+ private static final String READ_RECORD_TIME = "readRecordTime";
+ private static final String DECODE_RECORD_TIME = "decodeRecordTime";
+ private static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime";
+ private static final String LAST_RECORD_HEADER_TIMESTAMP = "lastRecordHeaderTimestamp";
+
+ @Getter
+ private final Map<KafkaPartition, ExtractorStats> statsMap;
+ private final Set<Integer> errorPartitions;
+ private final WorkUnitState workUnitState;
+
+ //A global count of number of undecodeable messages encountered by the KafkaExtractor across all Kafka
+ //TopicPartitions.
+ @Getter
+ private int undecodableMessageCount = 0;
+ private List<KafkaPartition> partitions;
+
+ public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) {
+ this.workUnitState = state;
+ this.partitions = partitions;
+ this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size());
+ this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
+ this.errorPartitions = Sets.newHashSet();
+ }
+
+ public int getErrorPartitionCount() {
+ return this.errorPartitions.size();
+ }
+
+ /**
+ * A Java POJO that encapsulates various extractor stats.
+ */
+ @Data
+ public static class ExtractorStats {
+ private long decodingErrorCount = -1L;
+ private double avgMillisPerRecord = -1;
+ private long avgRecordSize;
+ private long elapsedTime;
+ private long processedRecordCount;
+ private long partitionTotalSize;
+ private long decodeRecordTime;
+ private long fetchMessageBufferTime;
+ private long readRecordTime;
+ private long startFetchEpochTime;
+ private long stopFetchEpochTime;
+ private long lastSuccessfulRecordHeaderTimestamp;
+ }
+
+ /**
+ *
+ * @param partitionIdx index of Kafka topic partition.
+ * @return the number of undecodeable records for a given partition id.
+ */
+ public Long getDecodingErrorCount(int partitionIdx) {
+ return this.statsMap.get(this.partitions.get(partitionIdx)).getDecodingErrorCount();
+ }
+
+ /**
+ * Called when the KafkaExtractor encounters an undecodeable record.
+ */
+ public void onUndecodeableRecord(int partitionIdx) {
+ this.errorPartitions.add(partitionIdx);
+ this.undecodableMessageCount++;
+ incrementErrorCount(partitionIdx);
+ }
+
+ private void incrementErrorCount(int partitionIdx) {
+ this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+ if (v.decodingErrorCount < 0) {
+ v.decodingErrorCount = 1;
+ } else {
+ v.decodingErrorCount++;
+ }
+ return v;
+ });
+ }
+
+ public void resetStartFetchEpochTime(int partitionIdx) {
+ this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+ v.startFetchEpochTime = System.currentTimeMillis();
+ return v;
+ });
+ }
+
+ /**
+ * A method that is called when a Kafka record is successfully decoded.
+ * @param partitionIdx the index of Kafka Partition .
+ * @param readStartTime the start time when readRecord() is invoked.
+ * @param decodeStartTime the time instant immediately before a record decoding begins.
+ * @param recordSizeInBytes the size of the decoded record in bytes.
+ */
+ public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes) {
+ this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+ long currentTime = System.nanoTime();
+ v.processedRecordCount++;
+ v.partitionTotalSize += recordSizeInBytes;
+ v.decodeRecordTime += currentTime - decodeStartTime;
+ v.readRecordTime += currentTime - readStartTime;
+ return v;
+ });
+ }
+
+ /**
+ * A method that is called after a batch of records has been fetched from Kafka e.g. via a consumer.poll().
+ * @param partitionIdx the index of Kafka partition
+ * @param fetchStartTime the time instant immediately before fetching records from Kafka.
+ */
+ public void onFetchNextMessageBuffer(int partitionIdx, long fetchStartTime) {
+ this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+ v.fetchMessageBufferTime += System.nanoTime() - fetchStartTime;
+ return v;
+ });
+ }
+
+ /**
+ * A method when a partition has been processed.
+ * @param partitionIdx the index of Kafka partition
+ * @param readStartTime the start time when readRecord.
+ */
+ void onPartitionReadComplete(int partitionIdx, long readStartTime) {
+ this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+ v.readRecordTime += System.nanoTime() - readStartTime;
+ return v;
+ });
+ }
+
+ /**
+ * A method that is invoked to update the statistics for current partition. In the batch mode of execution, this is
+ * invoked when a partition has been processed and before the next partition can be processed. In the streaming mode of
+ * execution, this method is invoked on every flush.
+ * @param partitionIdx the index of Kafka partition
+ * @param readStartTime the start time when readRecord() is invoked.
+ */
+ public void updateStatisticsForCurrentPartition(int partitionIdx, long readStartTime, long lastSuccessfulRecordHeaderTimestamp) {
+ long stopFetchEpochTime = System.currentTimeMillis();
+ this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+ v.stopFetchEpochTime = stopFetchEpochTime;
+ if (v.processedRecordCount != 0) {
+ v.elapsedTime = stopFetchEpochTime - this.statsMap.get(this.partitions.get(partitionIdx)).getStartFetchEpochTime();
+ //Compute average stats
+ v.avgMillisPerRecord = (double) v.elapsedTime / (double) v.processedRecordCount;
+ v.avgRecordSize = this.statsMap.get(this.partitions.get(partitionIdx)).getPartitionTotalSize() / v.processedRecordCount;
+ v.lastSuccessfulRecordHeaderTimestamp = lastSuccessfulRecordHeaderTimestamp;
+ }
+ return v;
+ });
+ onPartitionReadComplete(partitionIdx, readStartTime);
+ }
+
+ private Map<String, String> createTagsForPartition(int partitionId, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, MultiLongWatermark nextWatermark) {
+ Map<String, String> tagsForPartition = Maps.newHashMap();
+ KafkaPartition partition = this.partitions.get(partitionId);
+
+ tagsForPartition.put(TOPIC, partition.getTopicName());
+ tagsForPartition.put(PARTITION, Integer.toString(partition.getId()));
+ tagsForPartition.put(LOW_WATERMARK, Long.toString(lowWatermark.get(partitionId)));
+ tagsForPartition.put(ACTUAL_HIGH_WATERMARK, Long.toString(nextWatermark.get(partitionId)));
+
+ // These are used to compute the load factor,
+ // gobblin consumption rate relative to the kafka production rate.
+ // The gobblin rate is computed as (processed record count/elapsed time)
+ // The kafka rate is computed as (expected high watermark - previous latest offset) /
+ // (current offset fetch epoch time - previous offset fetch epoch time).
+ tagsForPartition.put(EXPECTED_HIGH_WATERMARK, Long.toString(highWatermark.get(partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_OFFSET_FETCH_EPOCH_TIME, partitionId)));
+ tagsForPartition.put(KafkaSource.OFFSET_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.OFFSET_FETCH_EPOCH_TIME, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_LATEST_OFFSET,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_LATEST_OFFSET, partitionId)));
+
+ tagsForPartition.put(KafkaSource.PREVIOUS_LOW_WATERMARK,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_LOW_WATERMARK, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_HIGH_WATERMARK,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_HIGH_WATERMARK, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_START_FETCH_EPOCH_TIME, partitionId)));
+ tagsForPartition.put(KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME,
+ Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
+ KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
+
+ ExtractorStats stats = this.statsMap.getOrDefault(partition, new ExtractorStats());
+
+ tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(stats.getStartFetchEpochTime()));
+ tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(stats.getStopFetchEpochTime()));
+ this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
+ Long.toString(stats.getStartFetchEpochTime()));
+ this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
+ Long.toString(stats.getStopFetchEpochTime()));
+ tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(stats.getProcessedRecordCount()));
+ tagsForPartition.put(PARTITION_TOTAL_SIZE, Long.toString(stats.getPartitionTotalSize()));
+ tagsForPartition.put(AVG_RECORD_SIZE, Long.toString(stats.getAvgRecordSize()));
+ tagsForPartition.put(ELAPSED_TIME, Long.toString(stats.getElapsedTime()));
+ tagsForPartition.put(DECODE_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getDecodeRecordTime())));
+ tagsForPartition.put(FETCH_MESSAGE_BUFFER_TIME,
+ Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getFetchMessageBufferTime())));
+ tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getReadRecordTime())));
+ tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT, Long.toString(stats.getDecodingErrorCount()));
+ tagsForPartition.put(LAST_RECORD_HEADER_TIMESTAMP, Long.toString(stats.getLastSuccessfulRecordHeaderTimestamp()));
+
+ // Commit avg time to pull a record for each partition
+ double avgMillis = stats.getAvgMillisPerRecord();
+ if (avgMillis >= 0) {
+ log.info(String.format("Avg time to pull a record for partition %s = %f milliseconds", partition, avgMillis));
+ KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, partition, avgMillis);
+ tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(avgMillis));
+ } else {
+ log.info(String.format("Avg time to pull a record for partition %s not recorded", partition));
+ tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1));
+ }
+
+ return tagsForPartition;
+ }
+
+ /**
+ * Emit Tracking events reporting the various statistics to be consumed by a monitoring application.
+ * @param context the current {@link MetricContext}
+ * @param lowWatermark begin Kafka offset for each topic partition
+ * @param highWatermark the expected last Kafka offset for each topic partition to be consumed by the Extractor
+ * @param nextWatermark the offset of next valid message for each Kafka topic partition consumed by the Extractor
+ */
+ public void emitTrackingEvents(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
+ MultiLongWatermark nextWatermark) {
+ Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
+ for (int i = 0; i < this.partitions.size(); i++) {
+ log.info(String.format("Actual high watermark for partition %s=%d, expected=%d", this.partitions.get(i),
+ nextWatermark.get(i), highWatermark.get(i)));
+ tagsForPartitionsMap
+ .put(this.partitions.get(i), createTagsForPartition(i, lowWatermark, highWatermark, nextWatermark));
+ }
+ for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
+ new EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE).build()
+ .submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, eventTags.getValue());
+ }
+ }
+
+ /**
+ * Reset all KafkaExtractor stats.
+ */
+ public void reset() {
+ this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats()));
+ for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) {
+ resetStartFetchEpochTime(partitionIdx);
+ }
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
index 0a11bbf..29cea5b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaPartition.java
@@ -31,6 +31,7 @@ public final class KafkaPartition {
private final int id;
private final String topicName;
private KafkaLeader leader;
+ private int hashCode;
public static class Builder {
private int id = 0;
@@ -103,10 +104,14 @@ public final class KafkaPartition {
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + this.id;
- result = prime * result + ((this.topicName == null) ? 0 : this.topicName.hashCode());
+ int result = hashCode;
+ if (result == 0) {
+ final int prime = 31;
+ result = 1;
+ result = prime * result + this.id;
+ result = prime * result + ((this.topicName == null) ? 0 : this.topicName.hashCode());
+ hashCode = result;
+ }
return result;
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 62485f0..5fa53ea 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -30,14 +30,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
-import com.google.common.base.Joiner;
import com.google.common.base.Function;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -48,12 +46,19 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
+import lombok.Getter;
+import lombok.Setter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
@@ -66,13 +71,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.dataset.DatasetUtils;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
-
-import lombok.Getter;
-import lombok.Setter;
-import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
@@ -115,6 +114,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime";
+ public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class";
public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
"gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
new file mode 100644
index 0000000..abd0447
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+public class KafkaExtractorStatsTrackerTest {
+ List<KafkaPartition> kafkaPartitions = new ArrayList<>();
+ private KafkaExtractorStatsTracker extractorStatsTracker;
+
+ @BeforeClass
+ public void setUp() {
+ kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build());
+ kafkaPartitions.add(new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build());
+ this.extractorStatsTracker = new KafkaExtractorStatsTracker(new WorkUnitState(), kafkaPartitions);
+ }
+
+ @Test
+ public void testOnUndecodeableRecord() {
+ //Ensure that error counters are initialized correctly
+ Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 0);
+ Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), -1);
+ Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), -1);
+
+ //Ensure that error counters are updated correctly after 1st call to KafkaExtractorStatsTracker#onUndecodeableRecord()
+ this.extractorStatsTracker.onUndecodeableRecord(0);
+ Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), 1);
+ Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(1).longValue(), -1);
+ Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 1);
+
+ //Ensure that error counters are updated correctly after 2nd call to KafkaExtractorStatsTracker#onUndecodeableRecord()
+ this.extractorStatsTracker.onUndecodeableRecord(0);
+ Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(0).longValue(), 2);
+ Assert.assertEquals(this.extractorStatsTracker.getDecodingErrorCount(1).longValue(), -1);
+ Assert.assertEquals(this.extractorStatsTracker.getErrorPartitionCount(), 1);
+ }
+
+ @Test
+ public void testResetStartFetchEpochTime() {
+ long currentTime = System.currentTimeMillis();
+ this.extractorStatsTracker.resetStartFetchEpochTime(1);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getStartFetchEpochTime() >= currentTime);
+ }
+
+ @Test
+ public void testOnDecodeableRecord() throws InterruptedException {
+ long readStartTime = System.nanoTime();
+ Thread.sleep(1);
+ long decodeStartTime = System.nanoTime();
+
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime() == 0);
+
+ this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 1);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 100);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime() > 0);
+
+ readStartTime = System.nanoTime();
+ Thread.sleep(1);
+ decodeStartTime = System.nanoTime();
+ long previousDecodeRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
+ long previousReadRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
+
+ this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 2);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 200);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > previousDecodeRecordTime);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime() > previousReadRecordTime);
+ }
+
+ @Test
+ public void testOnFetchNextMessageBuffer() throws InterruptedException {
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getFetchMessageBufferTime(), 0);
+ long fetchStartTime = System.nanoTime();
+ Thread.sleep(1);
+ this.extractorStatsTracker.onFetchNextMessageBuffer(1, fetchStartTime);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getFetchMessageBufferTime() > 0);
+ }
+
+ @Test
+ public void testOnPartitionReadComplete() throws InterruptedException {
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getReadRecordTime(), 0);
+ long readStartTime = System.nanoTime();
+ Thread.sleep(1);
+ this.extractorStatsTracker.onPartitionReadComplete(1, readStartTime);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getReadRecordTime() > 0);
+ }
+
+ @Test (dependsOnMethods = "testOnDecodeableRecord")
+ public void testUpdateStatisticsForCurrentPartition()
+ throws InterruptedException {
+ long readStartTime = System.nanoTime();
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getStopFetchEpochTime(), 0);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getElapsedTime(), 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgMillisPerRecord() < 0);
+ this.extractorStatsTracker.updateStatisticsForCurrentPartition(0, readStartTime, 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getStopFetchEpochTime() > 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getElapsedTime() > 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgMillisPerRecord() > 0);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgRecordSize(), 100);
+
+ readStartTime = System.nanoTime();
+ Thread.sleep(1);
+ long decodeStartTime = System.nanoTime();
+ this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100);
+ this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getElapsedTime() > 0);
+ Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord() > 0);
+ Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(), 100);
+ }
+}
\ No newline at end of file