You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/03 16:18:24 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1064] Make KafkaAvroSchemaRegistry extendable

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 82b7a24  [GOBBLIN-1064] Make KafkaAvroSchemaRegistry extendable
82b7a24 is described below

commit 82b7a24c0c28b837331fa95242f5373c4db2f1f9
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Mar 3 08:18:15 2020 -0800

    [GOBBLIN-1064] Make KafkaAvroSchemaRegistry extendable
    
    add writer schema to workUnitState
    
    directly use writer.latest.schema
    
    Closes #2905 from ZihanLi58/GOBBLIN-1064
---
 .../java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java  | 2 +-
 gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java | 2 --
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index 2a216f2..c31c7c8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -63,7 +63,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema>
   public static final int SCHEMA_ID_LENGTH_BYTE = 16;
   public static final byte MAGIC_BYTE = 0x0;
 
-  private final GenericObjectPool<HttpClient> httpClientPool;
+  protected final GenericObjectPool<HttpClient> httpClientPool;
   private final String url;
   private final Optional<Map<String, String>> namespaceOverride;
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index e80e7d3..138185c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.runtime.fork;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
@@ -543,7 +542,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
 
       writerId = this.taskId + "_" + taskStartTime;
     }
-
     DataWriterBuilder<Object, Object> builder = this.taskContext.getDataWriterBuilder(this.branches, this.index)
         .writeTo(Destination.of(this.taskContext.getDestinationType(this.branches, this.index), this.taskState))
         .writeInFormat(this.taskContext.getWriterOutputFormat(this.branches, this.index)).withWriterId(writerId)