You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/21 19:09:06 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ae62669  [GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…
ae62669 is described below

commit ae62669d31f89a9b62e520f1ca3969536c36db19
Author: sv2000 <su...@gmail.com>
AuthorDate: Thu Nov 21 11:08:58 2019 -0800

    [GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…
    
    Closes #2815 from sv2000/metricContextPacker
---
 .../source/extractor/extract/kafka/KafkaSource.java  |  2 +-
 .../kafka/workunit/packer/KafkaWorkUnitPacker.java   | 20 +++++++++++++++-----
 .../workunit/packer/KafkaWorkUnitPackerTest.java     |  7 ++++---
 .../runtime/StateStoreBasedWatermarkStorage.java     |  2 +-
 4 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 5fa53ea..ac10acd 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -263,7 +263,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> {
       //determine the number of mappers
       int maxMapperNum =
           state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
-      KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state);
+      KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
       int numOfMultiWorkunits = maxMapperNum;
       if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
         double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 31e30d7..1d34aec 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -23,7 +23,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +36,7 @@ import com.google.common.primitives.Doubles;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.source.extractor.WatermarkInterval;
 import org.apache.gobblin.source.extractor.extract.AbstractSource;
@@ -46,6 +46,7 @@ import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
 import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
@@ -368,18 +369,23 @@ public abstract class KafkaWorkUnitPacker {
   }
 
   public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state) {
+    return getInstance(source, state, Optional.absent());
+  }
+
+  public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source, SourceState state,
+      Optional<MetricContext> metricContext) {
     if (state.contains(KAFKA_WORKUNIT_PACKER_TYPE)) {
       String packerTypeStr = state.getProp(KAFKA_WORKUNIT_PACKER_TYPE);
       Optional<PackerType> packerType = Enums.getIfPresent(PackerType.class, packerTypeStr);
       if (packerType.isPresent()) {
-        return getInstance(packerType.get(), source, state);
+        return getInstance(packerType.get(), source, state, metricContext);
       }
       throw new IllegalArgumentException("WorkUnit packer type " + packerTypeStr + " not found");
     }
-    return getInstance(DEFAULT_PACKER_TYPE, source, state);
+    return getInstance(DEFAULT_PACKER_TYPE, source, state, metricContext);
   }
 
-  public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state) {
+  public static KafkaWorkUnitPacker getInstance(PackerType packerType, AbstractSource<?, ?> source, SourceState state, Optional<MetricContext> metricContext) {
     switch (packerType) {
       case SINGLE_LEVEL:
         return new KafkaSingleLevelWorkUnitPacker(source, state);
@@ -388,7 +394,11 @@ public abstract class KafkaWorkUnitPacker {
       case CUSTOM:
         Preconditions.checkArgument(state.contains(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE));
         String className = state.getProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE);
-        return GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitPacker.class, className, source, state);
+        try {
+          return (KafkaWorkUnitPacker) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), source, state, metricContext);
+        } catch (ReflectiveOperationException e) {
+          throw new RuntimeException(e);
+        }
       default:
         throw new IllegalArgumentException("WorkUnit packer type " + packerType + " not found");
     }
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
index e392566..89a81d0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
@@ -20,14 +20,15 @@ package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.source.extractor.extract.AbstractSource;
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
 import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE;
 import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE;
 import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index 651a3ce..2a32f42 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -137,7 +137,7 @@ public class StateStoreBasedWatermarkStorage implements WatermarkStorage {
     return committed;
   }
 
-  Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
+  public Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws IOException {
     return _stateStore.getAll(_storeName);
   }