You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/26 15:47:15 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-887]
Generialize UniversalKafkaSource to accept Extractor that not extending
KafkaExtractor
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a1061e4 [GOBBLIN-887] Generialize UniversalKafkaSource to accept Extractor that not extending KafkaExtractor
a1061e4 is described below
commit a1061e4ae3b296f5978cdabf79c550687329a35c
Author: autumnust <le...@linkedin.com>
AuthorDate: Thu Sep 26 08:47:02 2019 -0700
[GOBBLIN-887] Generialize UniversalKafkaSource to accept Extractor that not extending KafkaExtractor
Closes #2741 from autumnust/KafkaExtractor
---
.../extractor/extract/kafka/UniversalKafkaSource.java | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
index 87714d7..b227636 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
@@ -19,19 +19,17 @@ package org.apache.gobblin.source.extractor.extract.kafka;
import java.io.IOException;
-import com.google.common.base.Preconditions;
-
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
+import com.google.common.base.Preconditions;
/**
- * A {@link KafkaSource} to use with arbitrary {@link KafkaExtractor}. Specify the extractor to use with key
+ * A {@link KafkaSource} to use with arbitrary {@link EventBasedExtractor}. Specify the extractor to use with key
* {@link #EXTRACTOR_TYPE}.
*/
public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> {
@@ -39,12 +37,13 @@ public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> {
public static final String EXTRACTOR_TYPE = "gobblin.source.kafka.extractorType";
@Override
- public Extractor<S, D> getExtractor(WorkUnitState state) throws IOException {
+ public Extractor<S, D> getExtractor(WorkUnitState state)
+ throws IOException {
Preconditions.checkArgument(state.contains(EXTRACTOR_TYPE), "Missing key " + EXTRACTOR_TYPE);
try {
- ClassAliasResolver<KafkaExtractor> aliasResolver = new ClassAliasResolver<>(KafkaExtractor.class);
- Class<? extends KafkaExtractor> klazz = aliasResolver.resolveClass(state.getProp(EXTRACTOR_TYPE));
+ ClassAliasResolver<EventBasedExtractor> aliasResolver = new ClassAliasResolver<>(EventBasedExtractor.class);
+ Class<? extends EventBasedExtractor> klazz = aliasResolver.resolveClass(state.getProp(EXTRACTOR_TYPE));
return GobblinConstructorUtils.invokeLongestConstructor(klazz, state);
} catch (ReflectiveOperationException e) {