You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/21 19:09:06 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ae62669 [GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…
ae62669 is described below
commit ae62669d31f89a9b62e520f1ca3969536c36db19
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Nov 21 11:08:58 2019 -0800
[GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…
Closes #2815 from sv2000/metricContextPacker
---
.../source/extractor/extract/kafka/KafkaSource.java | 2 +-
.../kafka/workunit/packer/KafkaWorkUnitPacker.java | 20 +++++++++++++++-----
.../workunit/packer/KafkaWorkUnitPackerTest.java | 7 ++++---
.../runtime/StateStoreBasedWatermarkStorage.java | 2 +-
4 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 5fa53ea..ac10acd 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -263,7 +263,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
//determine the number of mappers
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
- KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state);
+ KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 31e30d7..1d34aec 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -23,7 +23,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +36,7 @@ import com.google.common.primitives.Doubles;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
@@ -46,6 +46,7 @@ import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -368,18 +369,23 @@ public abstract class KafkaWorkUnitPacker {
}
public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state) {
+ return getInstance(source, state, Optional.absent());
+ }
+
+ public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state,
+ Optional<MetricContext> metricContext) {
if (state.contains(KAFKA_WORKUNIT_PACKER_TYPE)) {
String packerTypeStr = state.getProp(KAFKA_WORKUNIT_PACKER_TYPE);
Optional<PackerType> packerType = Enums.getIfPresent(PackerType.class, packerTypeStr);
if (packerType.isPresent()) {
- return getInstance(packerType.get(), source, state);
+ return getInstance(packerType.get(), source, state, metricContext);
}
throw new IllegalArgumentException("WorkUnit packer type " + packerTypeStr + " not found");
}
- return getInstance(DEFAULT_PACKER_TYPE, source, state);
+ return getInstance(DEFAULT_PACKER_TYPE, source, state, metricContext);
}
- public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state) {
+ public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state, Optional<MetricContext> metricContext) {
switch (packerType) {
case SINGLE_LEVEL:
return new KafkaSingleLevelWorkUnitPacker(source, state);
@@ -388,7 +394,11 @@ public abstract class KafkaWorkUnitPacker {
case CUSTOM:
Preconditions.checkArgument(state.contains(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE));
String className = state.getProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE);
- return GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitPacker.class, className, source, state);
+ try {
+ return (KafkaWorkUnitPacker) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), source, state, metricContext);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
default:
throw new IllegalArgumentException("WorkUnit packer type " + packerType + " not found");
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
index e392566..89a81d0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
@@ -20,14 +20,15 @@ package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.source.extractor.extract.AbstractSource;
-import org.apache.gobblin.source.workunit.WorkUnit;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE;
import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE;
import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index 651a3ce..2a32f42 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -137,7 +137,7 @@ public class StateStoreBasedWatermarkStorage implements WatermarkStorage {
return committed;
}
- Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
+ public Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
return _stateStore.getAll(_storeName);
}