You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/26 15:47:15 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-887] Generialize UniversalKafkaSource to accept Extractor that not extending KafkaExtractor

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a1061e4  [GOBBLIN-887] Generialize UniversalKafkaSource to accept Extractor that not extending KafkaExtractor
a1061e4 is described below

commit a1061e4ae3b296f5978cdabf79c550687329a35c
Author: autumnust <le...@linkedin.com>
AuthorDate: Thu Sep 26 08:47:02 2019 -0700

    [GOBBLIN-887] Generialize UniversalKafkaSource to accept Extractor that not extending KafkaExtractor
    
    Closes #2741 from autumnust/KafkaExtractor
---
 .../extractor/extract/kafka/UniversalKafkaSource.java     | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
index 87714d7..b227636 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
@@ -19,19 +19,17 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
+import com.google.common.base.Preconditions;
 
 
 /**
- * A {@link KafkaSource} to use with arbitrary {@link KafkaExtractor}. Specify the extractor to use with key
+ * A {@link KafkaSource} to use with arbitrary {@link EventBasedExtractor}. Specify the extractor to use with key
  * {@link #EXTRACTOR_TYPE}.
  */
 public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> {
@@ -39,12 +37,13 @@ public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> {
   public static final String EXTRACTOR_TYPE = "gobblin.source.kafka.extractorType";
 
   @Override
-  public Extractor<S, D> getExtractor(WorkUnitState state) throws IOException {
+  public Extractor<S, D> getExtractor(WorkUnitState state)
+      throws IOException {
     Preconditions.checkArgument(state.contains(EXTRACTOR_TYPE), "Missing key " + EXTRACTOR_TYPE);
 
     try {
-      ClassAliasResolver<KafkaExtractor> aliasResolver = new ClassAliasResolver<>(KafkaExtractor.class);
-      Class<? extends KafkaExtractor> klazz = aliasResolver.resolveClass(state.getProp(EXTRACTOR_TYPE));
+      ClassAliasResolver<EventBasedExtractor> aliasResolver = new ClassAliasResolver<>(EventBasedExtractor.class);
+      Class<? extends EventBasedExtractor> klazz = aliasResolver.resolveClass(state.getProp(EXTRACTOR_TYPE));
 
       return GobblinConstructorUtils.invokeLongestConstructor(klazz, state);
     } catch (ReflectiveOperationException e) {