You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/03 16:18:24 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1064] Make
KafkaAvroSchemaRegistry extendable
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 82b7a24 [GOBBLIN-1064] Make KafkaAvroSchemaRegistry extendable
82b7a24 is described below
commit 82b7a24c0c28b837331fa95242f5373c4db2f1f9
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Mar 3 08:18:15 2020 -0800
[GOBBLIN-1064] Make KafkaAvroSchemaRegistry extendable
add writer schema to workUnitState
directly use writer.latest.schema
Closes #2905 from ZihanLi58/GOBBLIN-1064
---
.../java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java | 2 +-
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java | 2 --
2 files changed, 1 insertion(+), 3 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index 2a216f2..c31c7c8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -63,7 +63,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
public static final int SCHEMA_ID_LENGTH_BYTE = 16;
public static final byte MAGIC_BYTE = 0x0;
- private final GenericObjectPool<HttpClient> httpClientPool;
+ protected final GenericObjectPool<HttpClient> httpClientPool;
private final String url;
private final Optional<Map<String, String>> namespaceOverride;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index e80e7d3..138185c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.runtime.fork;
import java.io.Closeable;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
@@ -543,7 +542,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
writerId = this.taskId + "_" + taskStartTime;
}
-
DataWriterBuilder<Object, Object> builder = this.taskContext.getDataWriterBuilder(this.branches, this.index)
.writeTo(Destination.of(this.taskContext.getDestinationType(this.branches, this.index), this.taskState))
.writeInFormat(this.taskContext.getWriterOutputFormat(this.branches, this.index)).withWriterId(writerId)