You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by sh...@apache.org on 2020/12/10 01:58:34 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1325] Add Kafka 1.x support : Writer only

This is an automated email from the ASF dual-hosted git repository.

shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f08d25  [GOBBLIN-1325] Add Kafka 1.x support : Writer only
1f08d25 is described below

commit 1f08d25e828737798abac750c59b6167abdba108
Author: Hanghang Liu <na...@gmail.com>
AuthorDate: Wed Dec 9 17:54:50 2020 -0800

    [GOBBLIN-1325] Add Kafka 1.x support : Writer only
    
    version set to 1.1 currently
    module under gobblin-kafka-1
    
    Closes #3163 from hanghangliu/GOBBLIN-1325-add-
---
 gobblin-modules/gobblin-kafka-1/build.gradle       |  91 ++++++
 .../gobblin/kafka/client/Kafka1ConsumerClient.java | 328 +++++++++++++++++++++
 .../kafka/serialize/LiAvroDeserializer.java        |  50 ++++
 .../gobblin/kafka/serialize/LiAvroSerializer.java  |  39 +++
 .../gobblin/kafka/writer/Kafka1DataWriter.java     | 187 ++++++++++++
 .../writer/Kafka1JsonObjectWriterBuilder.java      |  50 ++++
 .../kafka/writer/KafkaDataWriterBuilder.java       |  37 +++
 .../metrics/kafka/KafkaKeyValueProducerPusher.java | 106 +++++++
 .../gobblin/metrics/kafka/KafkaProducerPusher.java | 101 +++++++
 .../apache/gobblin/kafka/KafkaClusterTestBase.java | 130 ++++++++
 .../org/apache/gobblin/kafka/KafkaTestBase.java    | 252 ++++++++++++++++
 .../kafka/client/Kafka1ConsumerClientTest.java     |  73 +++++
 .../apache/gobblin/kafka/writer/ByPassWatcher.java |  30 ++
 .../gobblin/kafka/writer/Kafka1DataWriterTest.java | 261 ++++++++++++++++
 .../kafka/writer/Kafka1TopicProvisionTest.java     | 189 ++++++++++++
 .../reporter/KafkaKeyValueProducerPusherTest.java  |  98 ++++++
 .../metrics/reporter/KafkaProducerPusherTest.java  |  88 ++++++
 gradle/scripts/defaultBuildProperties.gradle       |   2 +
 gradle/scripts/dependencyDefinitions.gradle        |   4 +
 19 files changed, 2116 insertions(+)

diff --git a/gobblin-modules/gobblin-kafka-1/build.gradle b/gobblin-modules/gobblin-kafka-1/build.gradle
new file mode 100644
index 0000000..54800df
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/build.gradle
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+    compile project(":gobblin-modules:gobblin-kafka-common")
+    compile project(":gobblin-core-base")
+    compile project(":gobblin-utility")
+    compile project(":gobblin-metrics-libs:gobblin-metrics")
+
+    compile externalDependency.avro
+    compile externalDependency.jacksonCore
+    compile externalDependency.jacksonMapper
+    compile externalDependency.commonsHttpClient
+    compile externalDependency.commonsPool
+    compile externalDependency.commonsLang3
+    compile externalDependency.guava
+    compile externalDependency.slf4j
+    compile externalDependency.httpclient
+    compile externalDependency.httpcore
+    compile(externalDependency.kafka1) {
+        exclude group: "com.sun.jmx", module: "jmxri"
+        exclude group: "com.sun.jdmk", module: "jmxtools"
+        exclude group: "javax.jms", module: "jms"
+    }
+    compile externalDependency.kafka1Client
+    compile externalDependency.lombok
+    compile externalDependency.metricsCore
+    compile externalDependency.typesafeConfig
+    compile externalDependency.findBugsAnnotations
+
+    runtime externalDependency.confluentAvroSerializer
+    runtime externalDependency.confluentJsonSerializer
+    runtime externalDependency.confluentSchemaRegistryClient
+    runtime externalDependency.protobuf
+
+    testCompile project(":gobblin-service")
+    testCompile project(":gobblin-modules:gobblin-service-kafka")
+    testCompile project(path: ":gobblin-runtime", configuration: "tests")
+    testCompile project(":gobblin-test-utils")
+    testCompile externalDependency.confluentAvroSerializer
+    testCompile externalDependency.jsonAssert
+    testCompile externalDependency.mockito
+    testCompile externalDependency.testng
+    testCompile(externalDependency.kafka1Test) {
+        exclude group: "com.sun.jmx", module: "jmxri"
+        exclude group: "com.sun.jdmk", module: "jmxtools"
+        exclude group: "javax.jms", module: "jms"
+    }
+    testCompile(externalDependency.kafka1ClientTest) {
+        exclude group: "com.sun.jmx", module: "jmxri"
+        exclude group: "com.sun.jdmk", module: "jmxtools"
+        exclude group: "javax.jms", module: "jms"
+    }
+}
+
+configurations {
+    compile { transitive = false }
+    // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+    // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+    // HADOOP-5254 and MAPREDUCE-5664
+    all*.exclude group: 'xml-apis'
+    all*.exclude group: 'xerces'
+}
+
+test {
+    workingDir rootProject.rootDir
+    systemProperty "live.newtopic", System.getProperty("live.newtopic")
+    systemProperty "live.newtopic.replicationCount", System.getProperty("live.newtopic.replicationCount")
+    systemProperty "live.newtopic.partitionCount", System.getProperty("live.newtopic.partitionCount")
+    systemProperty "live.cluster.count", System.getProperty("live.cluster.count")
+    systemProperty "live.zookeeper", System.getProperty("live.zookeeper")
+    systemProperty "live.broker", System.getProperty("live.broker")
+}
+
+ext.classification = "library"
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
new file mode 100644
index 0000000..abc880c
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.client;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+
+/**
+ * A {@link GobblinKafkaConsumerClient} that uses kafka 1.1 consumer client. Use {@link Factory#create(Config)} to create
+ * new Kafka1.1ConsumerClients. The {@link Config} used to create clients must have required key {@value #GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY}
+ *
+ * @param <K> Message key type
+ * @param <V> Message value type
+ */
+@Slf4j
+public class Kafka1ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient {
+
+  private static final String CLIENT_BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+  private static final String CLIENT_ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
+  private static final String CLIENT_SESSION_TIMEOUT_KEY = "session.timeout.ms";
+  private static final String CLIENT_KEY_DESERIALIZER_CLASS_KEY = "key.deserializer";
+  private static final String CLIENT_VALUE_DESERIALIZER_CLASS_KEY = "value.deserializer";
+  private static final String CLIENT_GROUP_ID = "group.id";
+
+  private static final String DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
+  public static final String DEFAULT_KEY_DESERIALIZER =
+      "org.apache.kafka.common.serialization.StringDeserializer";
+  private static final String DEFAULT_GROUP_ID = "kafka1";
+
+  public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
+      + CLIENT_KEY_DESERIALIZER_CLASS_KEY;
+  public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
+      + CLIENT_VALUE_DESERIALIZER_CLASS_KEY;
+
+  private static final Config FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(CLIENT_ENABLE_AUTO_COMMIT_KEY, DEFAULT_ENABLE_AUTO_COMMIT)
+          .put(CLIENT_KEY_DESERIALIZER_CLASS_KEY, DEFAULT_KEY_DESERIALIZER)
+          .put(CLIENT_GROUP_ID, DEFAULT_GROUP_ID)
+          .build());
+
+  private final Consumer<K, V> consumer;
+
+  private Kafka1ConsumerClient(Config config) {
+    super(config);
+    Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
+        "Missing required property " + GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY);
+
+    Properties props = new Properties();
+    props.put(CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
+    props.put(CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
+
+    // grab all the config under "source.kafka" and add the defaults as fallback.
+    Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
+    // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka with a fallback to the
+    // shared config that start with "gobblin.kafka.sharedConfig"
+    Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback(
+        ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+    // The specific config overrides settings in the base config
+    Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
+    props.putAll(ConfigUtils.configToProperties(scopedConfig));
+
+    this.consumer = new KafkaConsumer<>(props);
+  }
+
+  public Kafka1ConsumerClient(Config config, Consumer<K, V> consumer) {
+    super(config);
+    this.consumer = consumer;
+  }
+
+  @Override
+  public List<KafkaTopic> getTopics() {
+    return FluentIterable.from(this.consumer.listTopics().entrySet())
+        .transform(new Function<Entry<String, List<PartitionInfo>>, KafkaTopic>() {
+          @Override
+          public KafkaTopic apply(Entry<String, List<PartitionInfo>> filteredTopicEntry) {
+            return new KafkaTopic(filteredTopicEntry.getKey(), Lists.transform(filteredTopicEntry.getValue(),
+                PARTITION_INFO_TO_KAFKA_PARTITION));
+          }
+        }).toList();
+  }
+
+  @Override
+  public long getEarliestOffset(KafkaPartition partition) {
+    TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+    List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
+    this.consumer.assign(topicPartitionList);
+    this.consumer.seekToBeginning(topicPartitionList);
+
+    return this.consumer.position(topicPartition);
+  }
+
+  @Override
+  public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
+    TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+    List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
+    this.consumer.assign(topicPartitionList);
+    this.consumer.seekToEnd(topicPartitionList);
+
+    return this.consumer.position(topicPartition);
+  }
+
+  @Override
+  public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
+
+    if (nextOffset > maxOffset) {
+      return null;
+    }
+
+    this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
+    this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
+    return consume();
+  }
+
+  @Override
+  public Iterator<KafkaConsumerRecord> consume() {
+    try {
+      ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
+
+      return Iterators.transform(consumerRecords.iterator(), input -> {
+        try {
+          return new Kafka1ConsumerRecord(input);
+        } catch (Throwable t) {
+          throw Throwables.propagate(t);
+        }
+      });
+    } catch (Exception e) {
+      log.error("Exception on polling records", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Subscribe to a kafka topic
+   * TODO Add multi topic support
+   * @param topic
+   */
+  @Override
+  public void subscribe(String topic) {
+    this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
+  }
+
+  /**
+   * Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
+   * TODO Add multi topic support
+   * @param topic
+   */
+  @Override
+  public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
+    this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
+      @Override
+      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
+      }
+
+      @Override
+      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
+      }
+    });
+  }
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
+    Map<String, Metric> codaHaleMetricMap = new HashMap<>();
+
+    kafkaMetrics
+        .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
+    return codaHaleMetricMap;
+  }
+
+  /**
+   * Commit offsets to Kafka asynchronously
+   */
+  @Override
+  public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+    consumer.commitAsync(offsets, new OffsetCommitCallback() {
+      @Override
+      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+        if(exception != null) {
+          log.error("Exception while committing offsets " + offsets, exception);
+          return;
+        }
+      }
+    });
+  }
+
+  /**
+   * Commit offsets to Kafka synchronously
+   */
+  @Override
+  public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+    consumer.commitSync(offsets);
+  }
+
+  /**
+   * returns the last committed offset for a KafkaPartition
+   * @param partition
+   * @return last committed offset or -1 for invalid KafkaPartition
+   */
+  @Override
+  public long committed(KafkaPartition partition) {
+    OffsetAndMetadata offsetAndMetadata =  consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
+    return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
+  }
+
+  /**
+   * Convert a {@link KafkaMetric} instance to a {@link Metric}.
+   * @param kafkaMetric
+   * @return
+   */
+  private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
+    if (log.isDebugEnabled()) {
+      log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
+    }
+    Gauge<Object> gauge = kafkaMetric::metricValue;
+    return gauge;
+  }
+
+  private String canonicalMetricName(KafkaMetric kafkaMetric) {
+    MetricName name = kafkaMetric.metricName();
+    return canonicalMetricName(name.group(), name.tags().values(), name.name());
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.consumer.close();
+  }
+
+  private static final Function<PartitionInfo, KafkaPartition> PARTITION_INFO_TO_KAFKA_PARTITION =
+      new Function<PartitionInfo, KafkaPartition>() {
+        @Override
+        public KafkaPartition apply(@Nonnull PartitionInfo partitionInfo) {
+          return new KafkaPartition.Builder().withId(partitionInfo.partition()).withTopicName(partitionInfo.topic())
+              .withLeaderId(partitionInfo.leader().id())
+              .withLeaderHostAndPort(partitionInfo.leader().host(), partitionInfo.leader().port()).build();
+        }
+      };
+
+  /**
+   * A factory class to instantiate {@link Kafka1ConsumerClient}
+   */
+  public static class Factory implements GobblinKafkaConsumerClientFactory {
+    @SuppressWarnings("rawtypes")
+    @Override
+    public GobblinKafkaConsumerClient create(Config config) {
+      return new Kafka1ConsumerClient(config);
+    }
+  }
+
+  /**
+   * A record returned by {@link Kafka1ConsumerClient}
+   *
+   * @param <K> Message key type
+   * @param <V> Message value type
+   */
+  @EqualsAndHashCode(callSuper = true)
+  @ToString
+  public static class Kafka1ConsumerRecord<K, V> extends BaseKafkaConsumerRecord implements
+      DecodeableKafkaRecord<K, V> {
+    private final ConsumerRecord<K, V> consumerRecord;
+
+    public Kafka1ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
+      // Kafka 09 consumerRecords do not provide value size.
+      // Only 08 and 11 versions provide them.
+      super(consumerRecord.offset(), consumerRecord.serializedValueSize() , consumerRecord.topic(), consumerRecord.partition());
+      this.consumerRecord = consumerRecord;
+    }
+
+    @Override
+    public K getKey() {
+      return this.consumerRecord.key();
+    }
+
+    @Override
+    public V getValue() {
+      return this.consumerRecord.value();
+    }
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroDeserializer.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroDeserializer.java
new file mode 100644
index 0000000..fb5269f
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroDeserializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.serialize;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+
+/**
+ * The LinkedIn Avro Deserializer (works with records serialized by the {@link LiAvroSerializer})
+ */
+@Slf4j
+public class LiAvroDeserializer extends LiAvroDeserializerBase implements Deserializer<GenericRecord> {
+  public LiAvroDeserializer(KafkaSchemaRegistry<MD5Digest, Schema> schemaRegistry) {
+    super(schemaRegistry);
+  }
+
+  /**
+   * @param topic topic associated with the data
+   * @param data  serialized bytes
+   * @return deserialized object
+   */
+  @Override
+  public GenericRecord deserialize(String topic, byte[] data) {
+    try {
+      return super.deserialize(topic, data);
+    } catch (org.apache.gobblin.kafka.serialize.SerializationException e) {
+      throw new SerializationException("Error during Deserialization", e);
+    }
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroSerializer.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroSerializer.java
new file mode 100644
index 0000000..8332198
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.serialize;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+
+/**
+ * LinkedIn's implementation of Avro-schema based serialization for Kafka
+ * TODO: Implement this for IndexedRecord not just GenericRecord
+ */
+public class LiAvroSerializer extends LiAvroSerializerBase implements Serializer<GenericRecord> {
+
+  @Override
+  public byte[] serialize(String topic, GenericRecord data) {
+    try {
+      return super.serialize(topic, data);
+    } catch (org.apache.gobblin.kafka.serialize.SerializationException e) {
+      throw new SerializationException(e);
+    }
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java
new file mode 100644
index 0000000..c533c4b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.*;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.*;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+
+/**
+ * Implementation of KafkaWriter that wraps a {@link KafkaProducer}.
+ * This provides at-least once semantics.
+ * Applications should expect data to be possibly written to Kafka even if the overall Gobblin job fails.
+ */
+@Slf4j
+public class Kafka1DataWriter<K, V> implements KafkaDataWriter<K, V> {
+
+  public static final WriteResponseMapper<RecordMetadata> WRITE_RESPONSE_WRAPPER =
+      new WriteResponseMapper<RecordMetadata>() {
+
+        @Override
+        public WriteResponse wrap(final RecordMetadata recordMetadata) {
+          return new WriteResponse<RecordMetadata>() {
+            @Override
+            public RecordMetadata getRawResponse() {
+              return recordMetadata;
+            }
+
+            @Override
+            public String getStringResponse() {
+              return recordMetadata.toString();
+            }
+
+            @Override
+            public long bytesWritten() {
+              return -1;
+            }
+          };
+        }
+      };
+
+  private final Producer<K, V> producer;
+  private final String topic;
+  private final KafkaWriterCommonConfig commonConfig;
+
+  public static Producer getKafkaProducer(Properties props) {
+    Object producerObject = KafkaWriterHelper.getKafkaProducer(props);
+    try {
+      Producer producer = (Producer) producerObject;
+      return producer;
+    } catch (ClassCastException e) {
+      log.error("Failed to instantiate Kafka producer " + producerObject.getClass().getName()
+          + " as instance of Producer.class", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  public Kafka1DataWriter(Properties props)
+      throws ConfigurationException {
+    this(getKafkaProducer(props), ConfigFactory.parseProperties(props));
+  }
+
+  public Kafka1DataWriter(Producer producer, Config config)
+      throws ConfigurationException {
+    this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
+    provisionTopic(topic, config);
+    this.producer = producer;
+    this.commonConfig = new KafkaWriterCommonConfig(config);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    log.debug("Close called");
+    this.producer.close();
+  }
+
+  @Override
+  public Future<WriteResponse> write(final V record, final WriteCallback callback) {
+    try {
+      Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record, this.commonConfig);
+      return write(keyValuePair, callback);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create a Kafka write request", e);
+    }
+  }
+
+  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final WriteCallback callback) {
+    try {
+      return new WriteResponseFuture<>(this.producer
+          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), keyValuePair.getValue()), new Callback() {
+            @Override
+            public void onCompletion(final RecordMetadata metadata, Exception exception) {
+              if (exception != null) {
+                callback.onFailure(exception);
+              } else {
+                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+              }
+            }
+          }), WRITE_RESPONSE_WRAPPER);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create a Kafka write request", e);
+    }
+  }
+
+  @Override
+  public void flush()
+      throws IOException {
+    this.producer.flush();
+  }
+
+  private void provisionTopic(String topicName, Config config) {
+    String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
+    if (!config.hasPath(zooKeeperPropKey)) {
+      log.debug("Topic " + topicName + " is configured without the partition and replication");
+      return;
+    }
+    String zookeeperConnect = config.getString(zooKeeperPropKey);
+    int sessionTimeoutMs = ConfigUtils.getInt(config,
+        KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT,
+        KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+    int connectionTimeoutMs = ConfigUtils.getInt(config,
+        KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT,
+        KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
+
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+    // createTopic() will only seem to work (it will return without error).  The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+    // topic.
+    ZkClient zkClient =
+        new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
+    // Security for Kafka was added in Kafka 0.9.0.0
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), false);
+    int partitions = ConfigUtils.getInt(config,
+        KafkaWriterConfigurationKeys.PARTITION_COUNT,
+        KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    int replication = ConfigUtils.getInt(config,
+        KafkaWriterConfigurationKeys.REPLICATION_COUNT,
+        KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    Properties topicConfig = new Properties();
+    if (AdminUtils.topicExists(zkUtils, topicName)) {
+      log.debug("Topic {} already exists with replication: {} and partitions: {}", topicName, replication, partitions);
+      boolean deleteTopicIfExists = ConfigUtils.getBoolean(config, KafkaWriterConfigurationKeys.DELETE_TOPIC_IF_EXISTS,
+          KafkaWriterConfigurationKeys.DEFAULT_DELETE_TOPIC_IF_EXISTS);
+      if (!deleteTopicIfExists) {
+        return;
+      } else {
+        log.debug("Deleting topic {}", topicName);
+        AdminUtils.deleteTopic(zkUtils, topicName);
+      }
+    }
+    AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig, RackAwareMode.Disabled$.MODULE$);
+    log.info("Created topic {} with replication: {} and partitions : {}", topicName, replication, partitions);
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1JsonObjectWriterBuilder.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1JsonObjectWriterBuilder.java
new file mode 100644
index 0000000..4e4e6d0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1JsonObjectWriterBuilder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.kafka.serialize.GsonSerializerBase;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Properties;
+
+
+/**
+ * A {@link org.apache.gobblin.writer.DataWriterBuilder} that builds a {@link org.apache.gobblin.writer.DataWriter} to
+ * write {@link JsonObject} to kafka
+ */
+public class Kafka1JsonObjectWriterBuilder extends AbstractKafkaDataWriterBuilder<JsonArray, JsonObject> {
+  private static final String VALUE_SERIALIZER_KEY =
+      KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG;
+
+  @Override
+  protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props)
+      throws ConfigurationException {
+    props.setProperty(VALUE_SERIALIZER_KEY, KafkaGsonObjectSerializer.class.getName());
+    return new Kafka1DataWriter<>(props);
+  }
+
+  /**
+   * A specific {@link Serializer} that serializes {@link JsonObject} to byte array
+   */
+  public final static class KafkaGsonObjectSerializer extends GsonSerializerBase<JsonObject> implements Serializer<JsonObject> {
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
new file mode 100644
index 0000000..e8e7fa4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.writer.AsyncDataWriter;
+
+import java.util.Properties;
+
+
+/**
+ * Builder that hands back a {@link Kafka1DataWriter}
+ */
+public class KafkaDataWriterBuilder<S, D> extends AbstractKafkaDataWriterBuilder<S, D> {
+  @Override
+  protected AsyncDataWriter<D> getAsyncDataWriter(Properties props)
+      throws ConfigurationException {
+    return new Kafka1DataWriter<>(props);
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
new file mode 100644
index 0000000..e04a14c
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Establishes a connection to a Kafka cluster and push keyed messages to a specified topic.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+@Slf4j
+public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
+  private final String topic;
+  private final KafkaProducer<K, V> producer;
+  private final Closer closer;
+
+  public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+    this.closer = Closer.create();
+
+    this.topic = topic;
+
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.RETRIES_CONFIG, 3);
+    //To guarantee ordered delivery, the maximum in flight requests must be set to 1.
+    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+
+    // add the kafka scoped config. if any of the above are specified then they are overridden
+    if (kafkaConfig.isPresent()) {
+      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+    }
+
+    this.producer = createProducer(props);
+  }
+
+  public KafkaKeyValueProducerPusher(String brokers, String topic) {
+    this(brokers, topic, Optional.absent());
+  }
+
+  /**
+   * Push all keyed messages to the Kafka topic.
+   *
+   * @param messages List of keyed messages to push to Kakfa.
+   */
+  public void pushMessages(List<Pair<K, V>> messages) {
+    for (Pair<K, V> message : messages) {
+      this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> {
+        if (e != null) {
+          log.error("Failed to send message to topic {} due to exception: ", topic, e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+    //since close() only guarantees delivery of in-flight messages.
+    log.info("Flushing records from producer buffer");
+    this.producer.flush();
+    this.closer.close();
+  }
+
+  /**
+   * Create the Kafka producer.
+   */
+  protected KafkaProducer<K, V> createProducer(Properties props) {
+    return this.closer.register(new KafkaProducer<K, V>(props));
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
new file mode 100644
index 0000000..823b83e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+@Slf4j
+public class KafkaProducerPusher implements Pusher<byte[]> {
+
+  private final String topic;
+  private final KafkaProducer<String, byte[]> producer;
+  private final Closer closer;
+
+  public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+    this.closer = Closer.create();
+
+    this.topic = topic;
+
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.RETRIES_CONFIG, 3);
+
+    // add the kafka scoped config. if any of the above are specified then they are overridden
+    if (kafkaConfig.isPresent()) {
+      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+    }
+
+    this.producer = createProducer(props);
+  }
+
+  public KafkaProducerPusher(String brokers, String topic) {
+    this(brokers, topic, Optional.absent());
+  }
+
+  /**
+   * Push all byte array messages to the Kafka topic.
+   *
+   * @param messages List of byte array messages to push to Kakfa.
+   */
+  public void pushMessages(List<byte[]> messages) {
+    for (byte[] message : messages) {
+      producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
+        if (e != null) {
+          log.error("Failed to send message to topic {} due to exception: ", topic, e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+    //since close() only guarantees delivery of in-flight messages.
+    log.info("Flushing records from producer buffer");
+    this.producer.flush();
+    this.closer.close();
+  }
+
+  /**
+   * Create the Kafka producer.
+   */
+  protected KafkaProducer<String, byte[]> createProducer(Properties props) {
+    return this.closer.register(new KafkaProducer<String, byte[]>(props));
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
new file mode 100644
index 0000000..238e11e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaClusterTestBase extends KafkaTestBase {
+
+  int clusterCount;
+  EmbeddedZookeeper _zkServer;
+  String _zkConnectString;
+  ZkClient _zkClient;
+  List<KafkaServer> kafkaBrokerList = new ArrayList<KafkaServer>();
+  List<Integer> kafkaBrokerPortList = new ArrayList<Integer>();
+
+  public KafkaClusterTestBase(int clusterCount) throws InterruptedException, RuntimeException {
+    super();
+    this.clusterCount = clusterCount;
+  }
+
+  public void startCluster() {
+    // Start Zookeeper.
+    _zkServer = new EmbeddedZookeeper();
+    _zkConnectString = "127.0.0.1:" + _zkServer.port();
+    _zkClient = new ZkClient(_zkConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+    // Start Kafka Cluster.
+    for (int i = 0; i < clusterCount; i++) {
+      KafkaServer _kafkaServer = createKafkaServer(i, _zkConnectString);
+      kafkaBrokerList.add(_kafkaServer);
+    }
+  }
+
+  public void stopCluster() {
+    Iterator<KafkaServer> iter = kafkaBrokerList.iterator();
+    while (iter.hasNext()) {
+      KafkaServer server = iter.next();
+      try {
+        server.shutdown();
+      } catch (RuntimeException e) {
+        // Simply Ignore.
+      }
+    }
+  }
+
+  public int getZookeeperPort() {
+    return _zkServer.port();
+  }
+
+  public List<KafkaServer> getBrokerList() {
+    return kafkaBrokerList;
+  }
+
+  public List<Integer> getKafkaBrokerPortList() {
+    return kafkaBrokerPortList;
+  }
+
+
+  public int getClusterCount() {
+    return kafkaBrokerList.size();
+  }
+
+  private KafkaServer createKafkaServer(int brokerId, String _zkConnectString) {
+
+    int _brokerId = brokerId;
+    int _kafkaServerPort = TestUtils.findFreePort();
+    Properties props = kafka.utils.TestUtils.createBrokerConfig(
+        _brokerId,
+        _zkConnectString,
+        kafka.utils.TestUtils.createBrokerConfig$default$3(),
+        kafka.utils.TestUtils.createBrokerConfig$default$4(),
+        _kafkaServerPort,
+        kafka.utils.TestUtils.createBrokerConfig$default$6(),
+        kafka.utils.TestUtils.createBrokerConfig$default$7(),
+        kafka.utils.TestUtils.createBrokerConfig$default$8(),
+        kafka.utils.TestUtils.createBrokerConfig$default$9(),
+        kafka.utils.TestUtils.createBrokerConfig$default$10(),
+        kafka.utils.TestUtils.createBrokerConfig$default$11(),
+        kafka.utils.TestUtils.createBrokerConfig$default$12(),
+        kafka.utils.TestUtils.createBrokerConfig$default$13(),
+        kafka.utils.TestUtils.createBrokerConfig$default$14(),
+        kafka.utils.TestUtils.createBrokerConfig$default$15(),
+        kafka.utils.TestUtils.createBrokerConfig$default$16(),
+        kafka.utils.TestUtils.createBrokerConfig$default$17(),
+        kafka.utils.TestUtils.createBrokerConfig$default$18()
+    );
+    KafkaConfig config = new KafkaConfig(props);
+    Time mock = new MockTime();
+    KafkaServer _kafkaServer = kafka.utils.TestUtils.createServer(config, mock);
+    kafkaBrokerPortList.add(_kafkaServerPort);
+    return _kafkaServer;
+  }
+
+  public String getBootServersList() {
+    String bootServerString = "";
+    Iterator<Integer> ports = kafkaBrokerPortList.iterator();
+    while (ports.hasNext()) {
+      Integer port = ports.next();
+      bootServerString = bootServerString + "localhost:" + port + ",";
+    }
+    bootServerString = bootServerString.substring(0, bootServerString.length() - 1);
+    return bootServerString;
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..d83500d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import lombok.extern.slf4j.Slf4j;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * A private class for starting a suite of servers for Kafka
+ * Calls to start and shutdown are reference counted, so that the suite is started and shutdown in pairs.
+ * A suite of servers (Zk, Kafka etc) will be started just once per process
+ */
+@Slf4j
+class KafkaServerSuite {
+  static KafkaServerSuite _instance;
+  private final int _kafkaServerPort;
+  private final AtomicInteger _numStarted;
+  private int _brokerId = 0;
+  private EmbeddedZookeeper _zkServer;
+  private ZkClient _zkClient;
+  private KafkaServer _kafkaServer;
+  private String _zkConnectString;
+  private KafkaServerSuite() {
+    _kafkaServerPort = TestUtils.findFreePort();
+    _zkConnectString = "UNINITIALIZED_HOST_PORT";
+    _numStarted = new AtomicInteger(0);
+  }
+
+  static KafkaServerSuite getInstance() {
+    if (null == _instance) {
+      _instance = new KafkaServerSuite();
+      return _instance;
+    } else {
+      return _instance;
+    }
+  }
+
+  public ZkClient getZkClient() {
+    return _zkClient;
+  }
+
+  public KafkaServer getKafkaServer() {
+    return _kafkaServer;
+  }
+
+  public int getKafkaServerPort() {
+    return _kafkaServerPort;
+  }
+
+  public String getZkConnectString() {
+    return _zkConnectString;
+  }
+
+  void start()
+      throws RuntimeException {
+    if (_numStarted.incrementAndGet() == 1) {
+      log.warn("Starting up Kafka server suite. Zk at " + _zkConnectString + "; Kafka server at " + _kafkaServerPort);
+      _zkServer = new EmbeddedZookeeper();
+      _zkConnectString = "127.0.0.1:" + _zkServer.port();
+      _zkClient = new ZkClient(_zkConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+
+      Properties props = kafka.utils.TestUtils.createBrokerConfig(
+          _brokerId,
+          _zkConnectString,
+          kafka.utils.TestUtils.createBrokerConfig$default$3(),
+          kafka.utils.TestUtils.createBrokerConfig$default$4(),
+          _kafkaServerPort,
+          kafka.utils.TestUtils.createBrokerConfig$default$6(),
+          kafka.utils.TestUtils.createBrokerConfig$default$7(),
+          kafka.utils.TestUtils.createBrokerConfig$default$8(),
+          kafka.utils.TestUtils.createBrokerConfig$default$9(),
+          kafka.utils.TestUtils.createBrokerConfig$default$10(),
+          kafka.utils.TestUtils.createBrokerConfig$default$11(),
+          kafka.utils.TestUtils.createBrokerConfig$default$12(),
+          kafka.utils.TestUtils.createBrokerConfig$default$13(),
+          kafka.utils.TestUtils.createBrokerConfig$default$14(),
+          kafka.utils.TestUtils.createBrokerConfig$default$15(),
+          kafka.utils.TestUtils.createBrokerConfig$default$16(),
+          kafka.utils.TestUtils.createBrokerConfig$default$17(),
+          kafka.utils.TestUtils.createBrokerConfig$default$18()
+      );
+
+
+      KafkaConfig config = new KafkaConfig(props);
+      Time mock = new MockTime();
+      _kafkaServer = kafka.utils.TestUtils.createServer(config, mock);
+    } else {
+      log.info("Kafka server suite already started... continuing");
+    }
+  }
+
+
+  void shutdown() {
+    if (_numStarted.decrementAndGet() == 0) {
+      log.info("Shutting down Kafka server suite");
+      _kafkaServer.shutdown();
+      _zkClient.close();
+      _zkServer.shutdown();
+    } else {
+      log.info("Kafka server suite still in use ... not shutting down yet");
+    }
+  }
+
+}
+
+class KafkaConsumerSuite {
+  private final ConsumerConnector _consumer;
+  private final KafkaStream<byte[], byte[]> _stream;
+  private final ConsumerIterator<byte[], byte[]> _iterator;
+  private final String _topic;
+
+  KafkaConsumerSuite(String zkConnectString, String topic) {
+    _topic = topic;
+    Properties consumeProps = new Properties();
+    consumeProps.put("zookeeper.connect", zkConnectString);
+    consumeProps.put("group.id", _topic + "-" + System.nanoTime());
+    consumeProps.put("zookeeper.session.timeout.ms", "10000");
+    consumeProps.put("zookeeper.sync.time.ms", "10000");
+    consumeProps.put("auto.commit.interval.ms", "10000");
+    consumeProps.put("_consumer.timeout.ms", "10000");
+
+    _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
+
+    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+        _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
+    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
+    _stream = streams.get(0);
+    _iterator = _stream.iterator();
+  }
+
+  void shutdown() {
+    _consumer.shutdown();
+  }
+
+  public ConsumerIterator<byte[], byte[]> getIterator() {
+    return _iterator;
+  }
+}
+
+/**
+ * A Helper class for testing against Kafka
+ * A suite of servers (Zk, Kafka etc) will be started just once per process
+ * Consumer and iterator will be created per instantiation and is one instance per topic.
+ */
+public class KafkaTestBase implements Closeable {
+
+  private final KafkaServerSuite _kafkaServerSuite;
+  private final Map<String, KafkaConsumerSuite> _topicConsumerMap;
+
+  public KafkaTestBase() throws InterruptedException, RuntimeException {
+
+    this._kafkaServerSuite = KafkaServerSuite.getInstance();
+    this._topicConsumerMap = new HashMap<>();
+  }
+
+  public synchronized void startServers() {
+    _kafkaServerSuite.start();
+  }
+
+  public void stopServers() {
+    _kafkaServerSuite.shutdown();
+  }
+
+  public void start() {
+    startServers();
+  }
+
+  public void stopClients() throws IOException {
+    for (Map.Entry<String, KafkaConsumerSuite> consumerSuiteEntry : _topicConsumerMap.entrySet()) {
+      consumerSuiteEntry.getValue().shutdown();
+      AdminUtils.deleteTopic(ZkUtils.apply(_kafkaServerSuite.getZkClient(), false),
+          consumerSuiteEntry.getKey());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    stopClients();
+    stopServers();
+  }
+
+  public void provisionTopic(String topic) {
+    if (_topicConsumerMap.containsKey(topic)) {
+      // nothing to do: return
+    } else {
+      // provision topic
+      AdminUtils.createTopic(ZkUtils.apply(_kafkaServerSuite.getZkClient(), false),
+          topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+      List<KafkaServer> servers = new ArrayList<>();
+      servers.add(_kafkaServerSuite.getKafkaServer());
+      kafka.utils.TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
+      KafkaConsumerSuite consumerSuite = new KafkaConsumerSuite(_kafkaServerSuite.getZkConnectString(), topic);
+      _topicConsumerMap.put(topic, consumerSuite);
+    }
+  }
+
+
+  public ConsumerIterator<byte[], byte[]> getIteratorForTopic(String topic) {
+    if (_topicConsumerMap.containsKey(topic)) {
+      return _topicConsumerMap.get(topic).getIterator();
+    } else {
+      throw new IllegalStateException("Could not find provisioned topic" + topic + ": call provisionTopic before");
+    }
+  }
+
+  public int getKafkaServerPort() {
+    return _kafkaServerSuite.getKafkaServerPort();
+  }
+
+  public String getZkConnectString() {
+    return this._kafkaServerSuite.getZkConnectString();
+  }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
new file mode 100644
index 0000000..6587ff7
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.client;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Set;
+
+
+public class Kafka1ConsumerClientTest {
+
+  @Test
+  public void testConsume() throws Exception {
+    Config testConfig = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.KAFKA_BROKERS, "test"));
+    MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.NONE);
+    consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0)));
+
+    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+    beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
+    consumer.updateBeginningOffsets(beginningOffsets);
+
+    ConsumerRecord<String, String> record0 = new ConsumerRecord<>("test_topic", 0, 0L, "key", "value0");
+    ConsumerRecord<String, String> record1 = new ConsumerRecord<>("test_topic", 0, 1L, "key", "value1");
+    ConsumerRecord<String, String> record2 = new ConsumerRecord<>("test_topic", 0, 2L, "key", "value2");
+
+    consumer.addRecord(record0);
+    consumer.addRecord(record1);
+    consumer.addRecord(record2);
+
+    try (Kafka1ConsumerClient<String, String> kafka1Client = new Kafka1ConsumerClient<>(testConfig, consumer);) {
+
+      // Consume from 0 offset
+      Set<KafkaConsumerRecord> consumedRecords =
+          Sets.newHashSet(kafka1Client.consume(new KafkaPartition.Builder().withId(0).withTopicName("test_topic")
+              .build(), 0l, 100l));
+
+      Set<Kafka1ConsumerClient.Kafka1ConsumerRecord<String, String>> expected =
+          ImmutableSet.of(new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record0),
+              new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record1), new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record2));
+      Assert.assertEquals(consumedRecords, expected);
+
+    }
+
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
new file mode 100644
index 0000000..cda0842
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class ByPassWatcher implements Watcher {
+
+  @Override
+  public void process(WatchedEvent event) {
+    // TODO Auto-generated method stub
+  }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1DataWriterTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1DataWriterTest.java
new file mode 100644
index 0000000..8a6a7a0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1DataWriterTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import kafka.message.MessageAndMetadata;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.kafka.schemareg.SchemaRegistryException;
+import org.apache.gobblin.kafka.serialize.LiAvroDeserializer;
+import org.apache.gobblin.kafka.serialize.LiAvroSerializer;
+import org.apache.gobblin.kafka.serialize.SerializationException;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.*;
+
+
+@Slf4j
+public class Kafka1DataWriterTest {
+
+
+  private final KafkaTestBase _kafkaTestHelper;
+
+  public Kafka1DataWriterTest()
+      throws InterruptedException, RuntimeException {
+    _kafkaTestHelper = new KafkaTestBase();
+  }
+
+  @BeforeSuite(alwaysRun = true)
+  public void beforeSuite() {
+    log.warn("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+
+    _kafkaTestHelper.startServers();
+  }
+
+  @AfterSuite(alwaysRun = true)
+  public void afterSuite()
+      throws IOException {
+    try {
+      _kafkaTestHelper.stopClients();
+    } finally {
+      _kafkaTestHelper.stopServers();
+    }
+  }
+
+  @Test
+  public void testStringSerialization()
+      throws IOException, InterruptedException, ExecutionException {
+    String topic = "testStringSerialization1";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    Kafka1DataWriter<String, String> kafka1DataWriter = new Kafka1DataWriter<>(props);
+    String messageString = "foobar";
+    WriteCallback callback = mock(WriteCallback.class);
+    Future<WriteResponse> future;
+
+    try {
+      future = kafka1DataWriter.write(messageString, callback);
+      kafka1DataWriter.flush();
+      verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+      verify(callback, never()).onFailure(isA(Exception.class));
+      Assert.assertTrue(future.isDone(), "Future should be done");
+      System.out.println(future.get().getStringResponse());
+      byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
+      String messageReceived = new String(message);
+      Assert.assertEquals(messageReceived, messageString);
+    } finally {
+      kafka1DataWriter.close();
+    }
+
+
+  }
+
+  @Test
+  public void testBinarySerialization()
+      throws IOException {
+    String topic = "testBinarySerialization1";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+    Kafka1DataWriter<String, byte[]> kafka1DataWriter = new Kafka1DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+    byte[] messageBytes = TestUtils.generateRandomBytes();
+
+    try {
+      kafka1DataWriter.write(messageBytes, callback);
+    } finally {
+      kafka1DataWriter.close();
+    }
+
+    verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+    verify(callback, never()).onFailure(isA(Exception.class));
+    byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
+    Assert.assertEquals(message, messageBytes);
+  }
+
+  @Test
+  public void testAvroSerialization()
+      throws IOException, SchemaRegistryException {
+    String topic = "testAvroSerialization1";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
+        "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
+        LiAvroSerializer.class.getName());
+
+    // set up mock schema registry
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+    Kafka1DataWriter<String, GenericRecord> kafka1DataWriter = new Kafka1DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    try {
+      kafka1DataWriter.write(record, callback);
+    } finally {
+      kafka1DataWriter.close();
+    }
+
+    log.info("Kafka events written");
+
+    verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+    verify(callback, never()).onFailure(isA(Exception.class));
+
+    byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
+
+    log.info("Kafka events read, start to check result... ");
+    ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
+    LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
+    GenericRecord receivedRecord = deser.deserialize(topic, message);
+    Assert.assertEquals(record.toString(), receivedRecord.toString());
+  }
+
+
+  @Test
+  public void testKeyedAvroSerialization()
+      throws IOException, SchemaRegistryException, SerializationException {
+    String topic = "testAvroSerialization1";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
+        "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
+        LiAvroSerializer.class.getName());
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");
+    String keyField = "field1";
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, keyField);
+
+
+    // set up mock schema registry
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+    Kafka1DataWriter<String, GenericRecord> kafka1DataWriter = new Kafka1DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    try {
+      kafka1DataWriter.write(record, callback);
+    } finally {
+      kafka1DataWriter.close();
+    }
+
+    verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+    verify(callback, never()).onFailure(isA(Exception.class));
+    MessageAndMetadata<byte[], byte[]> value = _kafkaTestHelper.getIteratorForTopic(topic).next();
+    byte[] key = value.key();
+    byte[] message = value.message();
+    ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
+    LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
+    GenericRecord receivedRecord = deser.deserialize(topic, message);
+    Assert.assertEquals(record.toString(), receivedRecord.toString());
+    Assert.assertEquals(new String(key), record.get(keyField));
+  }
+
+  @Test
+  public void testValueSerialization()
+      throws IOException, InterruptedException, SchemaRegistryException {
+    String topic = "testAvroSerialization1";
+    _kafkaTestHelper.provisionTopic(topic);
+    Properties props = new Properties();
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
+        "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
+        "org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");
+    String keyField = "field1";
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, keyField);
+    props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_VALUEFIELD_CONFIG, keyField);
+
+
+    // set up mock schema registry
+
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+        ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+    Kafka1DataWriter<String, GenericRecord> kafka1DataWriter = new Kafka1DataWriter<>(props);
+    WriteCallback callback = mock(WriteCallback.class);
+
+    GenericRecord record = TestUtils.generateRandomAvroRecord();
+    try {
+      kafka1DataWriter.write(record, callback);
+    } finally {
+      kafka1DataWriter.close();
+    }
+
+    verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+    verify(callback, never()).onFailure(isA(Exception.class));
+    MessageAndMetadata<byte[], byte[]> value = _kafkaTestHelper.getIteratorForTopic(topic).next();
+    byte[] key = value.key();
+    byte[] message = value.message();
+    Assert.assertEquals(new String(message), record.get(keyField));
+    Assert.assertEquals(new String(key), record.get(keyField));
+  }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1TopicProvisionTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1TopicProvisionTest.java
new file mode 100644
index 0000000..0117282
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1TopicProvisionTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.kafka.KafkaClusterTestBase;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.json.JSONObject;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+
+@Slf4j
+public class Kafka1TopicProvisionTest {
+
+  private final KafkaClusterTestBase _kafkaTestHelper;
+  private int testClusterCount = 5;
+
+  public Kafka1TopicProvisionTest()
+      throws InterruptedException, RuntimeException {
+    _kafkaTestHelper = new KafkaClusterTestBase(testClusterCount);
+  }
+
+  @BeforeSuite(alwaysRun = true)
+  public void beforeSuite() {
+    log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+    _kafkaTestHelper.startCluster();
+  }
+
+  @AfterSuite(alwaysRun = true)
+  public void afterSuite()
+      throws IOException {
+    _kafkaTestHelper.stopCluster();
+  }
+
+  @Test(enabled = false)
+  public void testCluster()
+      throws IOException, InterruptedException, KeeperException {
+    int clusterCount = _kafkaTestHelper.getClusterCount();
+    Assert.assertEquals(clusterCount, testClusterCount);
+    int zkPort = _kafkaTestHelper.getZookeeperPort();
+    String kafkaBrokerPortList = _kafkaTestHelper.getKafkaBrokerPortList().toString();
+    System.out.println("kafkaBrokerPortList : " + kafkaBrokerPortList);
+    ZooKeeper zk = new ZooKeeper("localhost:" + zkPort, 11000, new ByPassWatcher());
+    List<Integer> brokerPortList = new ArrayList<Integer>();
+    List<String> ids = zk.getChildren("/brokers/ids", false);
+    for (String id : ids) {
+      String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
+      JSONObject obj = new JSONObject(brokerInfo);
+      int brokerPort = obj.getInt("port");
+      System.out.println(brokerPort);
+      brokerPortList.add(brokerPort);
+    }
+    Assert.assertTrue(_kafkaTestHelper.getKafkaBrokerPortList().equals(brokerPortList));
+  }
+
+  @Test(enabled = false)
+  public void testTopicPartitionCreationCount()
+      throws IOException, InterruptedException, ExecutionException {
+    String topic = "topicPartition4";
+    int clusterCount = _kafkaTestHelper.getClusterCount();
+    int partionCount = clusterCount / 2;
+    int zkPort = _kafkaTestHelper.getZookeeperPort();
+    Properties props = new Properties();
+
+    //	Setting Topic Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, String.valueOf(clusterCount));
+    props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(partionCount));
+    props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, "localhost:" + zkPort);
+
+    // Setting Producer Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaTestHelper.getBootServersList());
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+    Kafka1DataWriter<String, String> kafka1DataWriter = new Kafka1DataWriter<>(props);
+
+    String zookeeperConnect = "localhost:" + _kafkaTestHelper.getZookeeperPort();
+    int sessionTimeoutMs = 10 * 1000;
+    int connectionTimeoutMs = 8 * 1000;
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+    // createTopic() will only seem to work (it will return without error).  The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(
+        zookeeperConnect,
+        sessionTimeoutMs,
+        connectionTimeoutMs,
+        ZKStringSerializer$.MODULE$);
+    boolean isSecureKafkaCluster = false;
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
+
+    Integer partitionCount = (Integer) zkUtils.getTopicPartitionCount(topic).get();
+    Assert.assertEquals(partitionCount.intValue(), partionCount);
+
+  }
+
+  @Test(enabled = false)
+  public void testLiveTopicPartitionCreationCount()
+      throws IOException, InterruptedException, ExecutionException {
+    String liveClusterCount = System.getProperty("live.cluster.count");
+    String liveZookeeper = System.getProperty("live.zookeeper");
+    String liveBroker = System.getProperty("live.broker");
+    String topic = System.getProperty("live.newtopic");
+    String topicReplicationCount = System.getProperty("live.newtopic.replicationCount");
+    String topicPartitionCount = System.getProperty("live.newtopic.partitionCount");
+    if (StringUtils.isEmpty(liveClusterCount)) {
+      Assert.assertTrue(true);
+      return;
+    }
+    if (StringUtils.isEmpty(topicPartitionCount)) {
+      int clusterCount = Integer.parseInt(liveClusterCount);
+      clusterCount--;
+      int partionCount = clusterCount / 2;
+      topicReplicationCount = String.valueOf(clusterCount);
+      topicPartitionCount = String.valueOf(partionCount);
+    }
+
+    Properties props = new Properties();
+    //	Setting Topic Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount);
+    props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount);
+    props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper);
+    // Setting Producer Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", liveBroker);
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+    Kafka1DataWriter<String, String> kafka1DataWriter = new Kafka1DataWriter<>(props);
+    int sessionTimeoutMs = 10 * 1000;
+    int connectionTimeoutMs = 8 * 1000;
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+    // createTopic() will only seem to work (it will return without error).  The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(
+        liveZookeeper,
+        sessionTimeoutMs,
+        connectionTimeoutMs,
+        ZKStringSerializer$.MODULE$);
+    boolean isSecureKafkaCluster = false;
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster);
+
+    Properties config = new Properties();
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaTestHelper.getBootServersList());
+    AdminClient adminClient = AdminClient.create(config);
+    DescribeTopicsResult describer = adminClient.describeTopics(Collections.singletonList(topic));
+
+    // Note: AdminUtils.fetchTopicMetadataFromZk is deprecated after 0.10.0. Please consider using AdminClient
+    // to fetch topic config, or using ZKUtils.
+    Assert.assertEquals(describer.values().get(topic).get().partitions().size(), Integer.parseInt(topicPartitionCount));
+
+  }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
new file mode 100644
index 0000000..07b1905
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+
+/**
+ * Test {@link KafkaKeyValueProducerPusher}.
+ */
+public class KafkaKeyValueProducerPusherTest {
+  public static final String TOPIC = KafkaKeyValueProducerPusherTest.class.getSimpleName();
+
+  private KafkaTestBase kafkaTestHelper;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    kafkaTestHelper = new KafkaTestBase();
+    kafkaTestHelper.startServers();
+
+    kafkaTestHelper.provisionTopic(TOPIC);
+  }
+
+  @Test
+  public void test() throws IOException {
+    // Test that the scoped config overrides the generic config
+    Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("127.0.0.1:dummy", TOPIC,
+        Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+    String msg1 = "msg1";
+    String msg2 = "msg2";
+
+    pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), Pair.of("key2", msg2.getBytes())));
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    assert (iterator.hasNext());
+
+    MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+
+    Assert.assertEquals(new String(messageAndMetadata.key()), "key1");
+    Assert.assertEquals(new String(messageAndMetadata.message()), msg1);
+    assert (iterator.hasNext());
+
+    messageAndMetadata = iterator.next();
+    Assert.assertEquals(new String(messageAndMetadata.key()), "key2");
+    Assert.assertEquals(new String(messageAndMetadata.message()), msg2);
+
+    pusher.close();
+  }
+
+  @AfterClass
+  public void after() {
+    try {
+      this.kafkaTestHelper.close();
+    } catch (Exception e) {
+      System.err.println("Failed to close Kafka server.");
+    }
+  }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
new file mode 100644
index 0000000..c369cdc
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import kafka.consumer.ConsumerIterator;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+
+/**
+ * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
+ */
+public class KafkaProducerPusherTest {
+  public static final String TOPIC = KafkaProducerPusherTest.class.getSimpleName();
+
+  private KafkaTestBase kafkaTestHelper;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    kafkaTestHelper = new KafkaTestBase();
+    kafkaTestHelper.startServers();
+
+    kafkaTestHelper.provisionTopic(TOPIC);
+  }
+
+  @Test
+  public void test() throws IOException {
+    // Test that the scoped config overrides the generic config
+    Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+    String msg1 = "msg1";
+    String msg2 = "msg2";
+
+    pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    assert (iterator.hasNext());
+    Assert.assertEquals(new String(iterator.next().message()), msg1);
+    assert (iterator.hasNext());
+    Assert.assertEquals(new String(iterator.next().message()), msg2);
+
+    pusher.close();
+  }
+
+  @AfterClass
+  public void after() {
+    try {
+      this.kafkaTestHelper.close();
+    } catch (Exception e) {
+      System.err.println("Failed to close Kafka server.");
+    }
+  }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index 8b1ae17..d58fb2a 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -35,6 +35,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
     "Java languange compatibility; supported versions: " + JavaVersion.VERSION_1_8))
     .register(new BuildProperty("kafka08Version", "0.8.2.2", "Kafka 0.8 dependencies version"))
     .register(new BuildProperty("kafka09Version", "0.9.0.1", "Kafka 0.9 dependencies version"))
+    .register(new BuildProperty("kafka1Version", "1.1.1", "Kafka 1.1 dependencies version"))
     .register(new BuildProperty("pegasusVersion", "29.6.4", "Pegasus dependencies version"))
     .register(new BuildProperty("publishToMaven", false, "Enable publishing of artifacts to a central Maven repository"))
     .register(new BuildProperty("publishToNexus", false, "Enable publishing of artifacts to Nexus"))
@@ -67,6 +68,7 @@ BUILD_PROPERTIES.ensureDefined('hadoopVersion')
 BUILD_PROPERTIES.ensureDefined('hiveVersion')
 BUILD_PROPERTIES.ensureDefined('kafka08Version')
 BUILD_PROPERTIES.ensureDefined('kafka09Version')
+BUILD_PROPERTIES.ensureDefined('kafka1Version')
 BUILD_PROPERTIES.ensureDefined('pegasusVersion')
 BUILD_PROPERTIES.ensureDefined('salesforceVersion')
 
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 5f11123..99fe31f 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -85,6 +85,10 @@ ext.externalDependency = [
     "kafka09": "org.apache.kafka:kafka_2.11:" + kafka09Version,
     "kafka09Test": "org.apache.kafka:kafka_2.11:" + kafka09Version + ":test",
     "kafka09Client": "org.apache.kafka:kafka-clients:" + kafka09Version,
+    "kafka1": "org.apache.kafka:kafka_2.11:" + kafka1Version,
+    "kafka1Test": "org.apache.kafka:kafka_2.11:" + kafka1Version + ":test",
+    "kafka1Client": "org.apache.kafka:kafka-clients:" + kafka1Version,
+    "kafka1ClientTest": "org.apache.kafka:kafka-clients:" + kafka1Version + ":test",
     "confluentSchemaRegistryClient": "io.confluent:kafka-schema-registry-client:" + confluentVersion,
     "confluentAvroSerializer": "io.confluent:kafka-avro-serializer:" + confluentVersion,
     "confluentJsonSerializer": "io.confluent:kafka-json-serializer:" + confluentVersion,