You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by sh...@apache.org on 2020/12/10 01:58:34 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1325] Add Kafka
1.x support : Writer only
This is an automated email from the ASF dual-hosted git repository.
shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1f08d25 [GOBBLIN-1325] Add Kafka 1.x support : Writer only
1f08d25 is described below
commit 1f08d25e828737798abac750c59b6167abdba108
Author: Hanghang Liu <na...@gmail.com>
AuthorDate: Wed Dec 9 17:54:50 2020 -0800
[GOBBLIN-1325] Add Kafka 1.x support : Writer only
version set to 1.1 currently
module under gobblin-kafka-1
Closes #3163 from hanghangliu/GOBBLIN-1325-add-
---
gobblin-modules/gobblin-kafka-1/build.gradle | 91 ++++++
.../gobblin/kafka/client/Kafka1ConsumerClient.java | 328 +++++++++++++++++++++
.../kafka/serialize/LiAvroDeserializer.java | 50 ++++
.../gobblin/kafka/serialize/LiAvroSerializer.java | 39 +++
.../gobblin/kafka/writer/Kafka1DataWriter.java | 187 ++++++++++++
.../writer/Kafka1JsonObjectWriterBuilder.java | 50 ++++
.../kafka/writer/KafkaDataWriterBuilder.java | 37 +++
.../metrics/kafka/KafkaKeyValueProducerPusher.java | 106 +++++++
.../gobblin/metrics/kafka/KafkaProducerPusher.java | 101 +++++++
.../apache/gobblin/kafka/KafkaClusterTestBase.java | 130 ++++++++
.../org/apache/gobblin/kafka/KafkaTestBase.java | 252 ++++++++++++++++
.../kafka/client/Kafka1ConsumerClientTest.java | 73 +++++
.../apache/gobblin/kafka/writer/ByPassWatcher.java | 30 ++
.../gobblin/kafka/writer/Kafka1DataWriterTest.java | 261 ++++++++++++++++
.../kafka/writer/Kafka1TopicProvisionTest.java | 189 ++++++++++++
.../reporter/KafkaKeyValueProducerPusherTest.java | 98 ++++++
.../metrics/reporter/KafkaProducerPusherTest.java | 88 ++++++
gradle/scripts/defaultBuildProperties.gradle | 2 +
gradle/scripts/dependencyDefinitions.gradle | 4 +
19 files changed, 2116 insertions(+)
diff --git a/gobblin-modules/gobblin-kafka-1/build.gradle b/gobblin-modules/gobblin-kafka-1/build.gradle
new file mode 100644
index 0000000..54800df
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/build.gradle
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+ compile project(":gobblin-modules:gobblin-kafka-common")
+ compile project(":gobblin-core-base")
+ compile project(":gobblin-utility")
+ compile project(":gobblin-metrics-libs:gobblin-metrics")
+
+ compile externalDependency.avro
+ compile externalDependency.jacksonCore
+ compile externalDependency.jacksonMapper
+ compile externalDependency.commonsHttpClient
+ compile externalDependency.commonsPool
+ compile externalDependency.commonsLang3
+ compile externalDependency.guava
+ compile externalDependency.slf4j
+ compile externalDependency.httpclient
+ compile externalDependency.httpcore
+ compile(externalDependency.kafka1) {
+ exclude group: "com.sun.jmx", module: "jmxri"
+ exclude group: "com.sun.jdmk", module: "jmxtools"
+ exclude group: "javax.jms", module: "jms"
+ }
+ compile externalDependency.kafka1Client
+ compile externalDependency.lombok
+ compile externalDependency.metricsCore
+ compile externalDependency.typesafeConfig
+ compile externalDependency.findBugsAnnotations
+
+ runtime externalDependency.confluentAvroSerializer
+ runtime externalDependency.confluentJsonSerializer
+ runtime externalDependency.confluentSchemaRegistryClient
+ runtime externalDependency.protobuf
+
+ testCompile project(":gobblin-service")
+ testCompile project(":gobblin-modules:gobblin-service-kafka")
+ testCompile project(path: ":gobblin-runtime", configuration: "tests")
+ testCompile project(":gobblin-test-utils")
+ testCompile externalDependency.confluentAvroSerializer
+ testCompile externalDependency.jsonAssert
+ testCompile externalDependency.mockito
+ testCompile externalDependency.testng
+ testCompile(externalDependency.kafka1Test) {
+ exclude group: "com.sun.jmx", module: "jmxri"
+ exclude group: "com.sun.jdmk", module: "jmxtools"
+ exclude group: "javax.jms", module: "jms"
+ }
+ testCompile(externalDependency.kafka1ClientTest) {
+ exclude group: "com.sun.jmx", module: "jmxri"
+ exclude group: "com.sun.jdmk", module: "jmxtools"
+ exclude group: "javax.jms", module: "jms"
+ }
+}
+
+configurations {
+ compile { transitive = false }
+ // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+ // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+ // HADOOP-5254 and MAPREDUCE-5664
+ all*.exclude group: 'xml-apis'
+ all*.exclude group: 'xerces'
+}
+
+test {
+ workingDir rootProject.rootDir
+ systemProperty "live.newtopic", System.getProperty("live.newtopic")
+ systemProperty "live.newtopic.replicationCount", System.getProperty("live.newtopic.replicationCount")
+ systemProperty "live.newtopic.partitionCount", System.getProperty("live.newtopic.partitionCount")
+ systemProperty "live.cluster.count", System.getProperty("live.cluster.count")
+ systemProperty "live.zookeeper", System.getProperty("live.zookeeper")
+ systemProperty "live.broker", System.getProperty("live.broker")
+}
+
+ext.classification = "library"
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
new file mode 100644
index 0000000..abc880c
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClient.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.client;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+
+/**
+ * A {@link GobblinKafkaConsumerClient} that uses kafka 1.1 consumer client. Use {@link Factory#create(Config)} to create
+ * new Kafka1.1ConsumerClients. The {@link Config} used to create clients must have required key {@value #GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY}
+ *
+ * @param <K> Message key type
+ * @param <V> Message value type
+ */
+@Slf4j
+public class Kafka1ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient {
+
+ private static final String CLIENT_BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+ private static final String CLIENT_ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit";
+ private static final String CLIENT_SESSION_TIMEOUT_KEY = "session.timeout.ms";
+ private static final String CLIENT_KEY_DESERIALIZER_CLASS_KEY = "key.deserializer";
+ private static final String CLIENT_VALUE_DESERIALIZER_CLASS_KEY = "value.deserializer";
+ private static final String CLIENT_GROUP_ID = "group.id";
+
+ private static final String DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false);
+ public static final String DEFAULT_KEY_DESERIALIZER =
+ "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_GROUP_ID = "kafka1";
+
+ public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
+ + CLIENT_KEY_DESERIALIZER_CLASS_KEY;
+ public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX
+ + CLIENT_VALUE_DESERIALIZER_CLASS_KEY;
+
+ private static final Config FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(CLIENT_ENABLE_AUTO_COMMIT_KEY, DEFAULT_ENABLE_AUTO_COMMIT)
+ .put(CLIENT_KEY_DESERIALIZER_CLASS_KEY, DEFAULT_KEY_DESERIALIZER)
+ .put(CLIENT_GROUP_ID, DEFAULT_GROUP_ID)
+ .build());
+
+ private final Consumer<K, V> consumer;
+
+ private Kafka1ConsumerClient(Config config) {
+ super(config);
+ Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
+ "Missing required property " + GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY);
+
+ Properties props = new Properties();
+ props.put(CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers));
+ props.put(CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
+
+ // grab all the config under "source.kafka" and add the defaults as fallback.
+ Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK);
+ // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka with a fallback to the
+ // shared config that start with "gobblin.kafka.sharedConfig"
+ Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG).withFallback(
+ ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+ // The specific config overrides settings in the base config
+ Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
+ props.putAll(ConfigUtils.configToProperties(scopedConfig));
+
+ this.consumer = new KafkaConsumer<>(props);
+ }
+
+ public Kafka1ConsumerClient(Config config, Consumer<K, V> consumer) {
+ super(config);
+ this.consumer = consumer;
+ }
+
+ @Override
+ public List<KafkaTopic> getTopics() {
+ return FluentIterable.from(this.consumer.listTopics().entrySet())
+ .transform(new Function<Entry<String, List<PartitionInfo>>, KafkaTopic>() {
+ @Override
+ public KafkaTopic apply(Entry<String, List<PartitionInfo>> filteredTopicEntry) {
+ return new KafkaTopic(filteredTopicEntry.getKey(), Lists.transform(filteredTopicEntry.getValue(),
+ PARTITION_INFO_TO_KAFKA_PARTITION));
+ }
+ }).toList();
+ }
+
+ @Override
+ public long getEarliestOffset(KafkaPartition partition) {
+ TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+ List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
+ this.consumer.assign(topicPartitionList);
+ this.consumer.seekToBeginning(topicPartitionList);
+
+ return this.consumer.position(topicPartition);
+ }
+
+ @Override
+ public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
+ TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
+ List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
+ this.consumer.assign(topicPartitionList);
+ this.consumer.seekToEnd(topicPartitionList);
+
+ return this.consumer.position(topicPartition);
+ }
+
+ @Override
+ public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
+
+ if (nextOffset > maxOffset) {
+ return null;
+ }
+
+ this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
+ this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
+ return consume();
+ }
+
+ @Override
+ public Iterator<KafkaConsumerRecord> consume() {
+ try {
+ ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
+
+ return Iterators.transform(consumerRecords.iterator(), input -> {
+ try {
+ return new Kafka1ConsumerRecord(input);
+ } catch (Throwable t) {
+ throw Throwables.propagate(t);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Exception on polling records", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Subscribe to a kafka topic
+ * TODO Add multi topic support
+ * @param topic
+ */
+ @Override
+ public void subscribe(String topic) {
+ this.consumer.subscribe(Lists.newArrayList(topic), new NoOpConsumerRebalanceListener());
+ }
+
+ /**
+ * Subscribe to a kafka topic with a {#GobblinConsumerRebalanceListener}
+ * TODO Add multi topic support
+ * @param topic
+ */
+ @Override
+ public void subscribe(String topic, GobblinConsumerRebalanceListener listener) {
+ this.consumer.subscribe(Lists.newArrayList(topic), new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ listener.onPartitionsRevoked(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ listener.onPartitionsAssigned(partitions.stream().map(a -> new KafkaPartition.Builder().withTopicName(a.topic()).withId(a.partition()).build()).collect(Collectors.toList()));
+ }
+ });
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
+ Map<String, Metric> codaHaleMetricMap = new HashMap<>();
+
+ kafkaMetrics
+ .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
+ return codaHaleMetricMap;
+ }
+
+ /**
+ * Commit offsets to Kafka asynchronously
+ */
+ @Override
+ public void commitOffsetsAsync(Map<KafkaPartition, Long> partitionOffsets) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+ consumer.commitAsync(offsets, new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ if(exception != null) {
+ log.error("Exception while committing offsets " + offsets, exception);
+ return;
+ }
+ }
+ });
+ }
+
+ /**
+ * Commit offsets to Kafka synchronously
+ */
+ @Override
+ public void commitOffsetsSync(Map<KafkaPartition, Long> partitionOffsets) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = partitionOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(e.getKey().getTopicName(),e.getKey().getId()), e -> new OffsetAndMetadata(e.getValue())));
+ consumer.commitSync(offsets);
+ }
+
+ /**
+ * returns the last committed offset for a KafkaPartition
+ * @param partition
+ * @return last committed offset or -1 for invalid KafkaPartition
+ */
+ @Override
+ public long committed(KafkaPartition partition) {
+ OffsetAndMetadata offsetAndMetadata = consumer.committed(new TopicPartition(partition.getTopicName(), partition.getId()));
+ return offsetAndMetadata != null ? offsetAndMetadata.offset() : -1l;
+ }
+
+ /**
+ * Convert a {@link KafkaMetric} instance to a {@link Metric}.
+ * @param kafkaMetric
+ * @return
+ */
+ private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
+ }
+ Gauge<Object> gauge = kafkaMetric::metricValue;
+ return gauge;
+ }
+
+ private String canonicalMetricName(KafkaMetric kafkaMetric) {
+ MetricName name = kafkaMetric.metricName();
+ return canonicalMetricName(name.group(), name.tags().values(), name.name());
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.consumer.close();
+ }
+
+ private static final Function<PartitionInfo, KafkaPartition> PARTITION_INFO_TO_KAFKA_PARTITION =
+ new Function<PartitionInfo, KafkaPartition>() {
+ @Override
+ public KafkaPartition apply(@Nonnull PartitionInfo partitionInfo) {
+ return new KafkaPartition.Builder().withId(partitionInfo.partition()).withTopicName(partitionInfo.topic())
+ .withLeaderId(partitionInfo.leader().id())
+ .withLeaderHostAndPort(partitionInfo.leader().host(), partitionInfo.leader().port()).build();
+ }
+ };
+
+ /**
+ * A factory class to instantiate {@link Kafka1ConsumerClient}
+ */
+ public static class Factory implements GobblinKafkaConsumerClientFactory {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public GobblinKafkaConsumerClient create(Config config) {
+ return new Kafka1ConsumerClient(config);
+ }
+ }
+
+ /**
+ * A record returned by {@link Kafka1ConsumerClient}
+ *
+ * @param <K> Message key type
+ * @param <V> Message value type
+ */
+ @EqualsAndHashCode(callSuper = true)
+ @ToString
+ public static class Kafka1ConsumerRecord<K, V> extends BaseKafkaConsumerRecord implements
+ DecodeableKafkaRecord<K, V> {
+ private final ConsumerRecord<K, V> consumerRecord;
+
+ public Kafka1ConsumerRecord(ConsumerRecord<K, V> consumerRecord) {
+ // Kafka 09 consumerRecords do not provide value size.
+ // Only 08 and 11 versions provide them.
+ super(consumerRecord.offset(), consumerRecord.serializedValueSize() , consumerRecord.topic(), consumerRecord.partition());
+ this.consumerRecord = consumerRecord;
+ }
+
+ @Override
+ public K getKey() {
+ return this.consumerRecord.key();
+ }
+
+ @Override
+ public V getValue() {
+ return this.consumerRecord.value();
+ }
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroDeserializer.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroDeserializer.java
new file mode 100644
index 0000000..fb5269f
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroDeserializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.serialize;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+
+/**
+ * The LinkedIn Avro Deserializer (works with records serialized by the {@link LiAvroSerializer})
+ */
+@Slf4j
+public class LiAvroDeserializer extends LiAvroDeserializerBase implements Deserializer<GenericRecord> {
+ public LiAvroDeserializer(KafkaSchemaRegistry<MD5Digest, Schema> schemaRegistry) {
+ super(schemaRegistry);
+ }
+
+ /**
+ * @param topic topic associated with the data
+ * @param data serialized bytes
+ * @return deserialized object
+ */
+ @Override
+ public GenericRecord deserialize(String topic, byte[] data) {
+ try {
+ return super.deserialize(topic, data);
+ } catch (org.apache.gobblin.kafka.serialize.SerializationException e) {
+ throw new SerializationException("Error during Deserialization", e);
+ }
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroSerializer.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroSerializer.java
new file mode 100644
index 0000000..8332198
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/serialize/LiAvroSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.serialize;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+
+/**
+ * LinkedIn's implementation of Avro-schema based serialization for Kafka
+ * TODO: Implement this for IndexedRecord not just GenericRecord
+ */
+public class LiAvroSerializer extends LiAvroSerializerBase implements Serializer<GenericRecord> {
+
+ @Override
+ public byte[] serialize(String topic, GenericRecord data) {
+ try {
+ return super.serialize(topic, data);
+ } catch (org.apache.gobblin.kafka.serialize.SerializationException e) {
+ throw new SerializationException(e);
+ }
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java
new file mode 100644
index 0000000..c533c4b
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.*;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.*;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+
+/**
+ * Implementation of KafkaWriter that wraps a {@link KafkaProducer}.
+ * This provides at-least once semantics.
+ * Applications should expect data to be possibly written to Kafka even if the overall Gobblin job fails.
+ */
+@Slf4j
+public class Kafka1DataWriter<K, V> implements KafkaDataWriter<K, V> {
+
+ public static final WriteResponseMapper<RecordMetadata> WRITE_RESPONSE_WRAPPER =
+ new WriteResponseMapper<RecordMetadata>() {
+
+ @Override
+ public WriteResponse wrap(final RecordMetadata recordMetadata) {
+ return new WriteResponse<RecordMetadata>() {
+ @Override
+ public RecordMetadata getRawResponse() {
+ return recordMetadata;
+ }
+
+ @Override
+ public String getStringResponse() {
+ return recordMetadata.toString();
+ }
+
+ @Override
+ public long bytesWritten() {
+ return -1;
+ }
+ };
+ }
+ };
+
+ private final Producer<K, V> producer;
+ private final String topic;
+ private final KafkaWriterCommonConfig commonConfig;
+
+ public static Producer getKafkaProducer(Properties props) {
+ Object producerObject = KafkaWriterHelper.getKafkaProducer(props);
+ try {
+ Producer producer = (Producer) producerObject;
+ return producer;
+ } catch (ClassCastException e) {
+ log.error("Failed to instantiate Kafka producer " + producerObject.getClass().getName()
+ + " as instance of Producer.class", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public Kafka1DataWriter(Properties props)
+ throws ConfigurationException {
+ this(getKafkaProducer(props), ConfigFactory.parseProperties(props));
+ }
+
+ public Kafka1DataWriter(Producer producer, Config config)
+ throws ConfigurationException {
+ this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
+ provisionTopic(topic, config);
+ this.producer = producer;
+ this.commonConfig = new KafkaWriterCommonConfig(config);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ log.debug("Close called");
+ this.producer.close();
+ }
+
+ @Override
+ public Future<WriteResponse> write(final V record, final WriteCallback callback) {
+ try {
+ Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record, this.commonConfig);
+ return write(keyValuePair, callback);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create a Kafka write request", e);
+ }
+ }
+
+ public Future<WriteResponse> write(Pair<K, V> keyValuePair, final WriteCallback callback) {
+ try {
+ return new WriteResponseFuture<>(this.producer
+ .send(new ProducerRecord<>(topic, keyValuePair.getKey(), keyValuePair.getValue()), new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ callback.onFailure(exception);
+ } else {
+ callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
+ }
+ }
+ }), WRITE_RESPONSE_WRAPPER);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create a Kafka write request", e);
+ }
+ }
+
+ @Override
+ public void flush()
+ throws IOException {
+ this.producer.flush();
+ }
+
+ private void provisionTopic(String topicName, Config config) {
+ String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
+ if (!config.hasPath(zooKeeperPropKey)) {
+ log.debug("Topic " + topicName + " is configured without the partition and replication");
+ return;
+ }
+ String zookeeperConnect = config.getString(zooKeeperPropKey);
+ int sessionTimeoutMs = ConfigUtils.getInt(config,
+ KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT,
+ KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ int connectionTimeoutMs = ConfigUtils.getInt(config,
+ KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT,
+ KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
+
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient =
+ new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
+ // Security for Kafka was added in Kafka 0.9.0.0
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), false);
+ int partitions = ConfigUtils.getInt(config,
+ KafkaWriterConfigurationKeys.PARTITION_COUNT,
+ KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+ int replication = ConfigUtils.getInt(config,
+ KafkaWriterConfigurationKeys.REPLICATION_COUNT,
+ KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+ Properties topicConfig = new Properties();
+ if (AdminUtils.topicExists(zkUtils, topicName)) {
+ log.debug("Topic {} already exists with replication: {} and partitions: {}", topicName, replication, partitions);
+ boolean deleteTopicIfExists = ConfigUtils.getBoolean(config, KafkaWriterConfigurationKeys.DELETE_TOPIC_IF_EXISTS,
+ KafkaWriterConfigurationKeys.DEFAULT_DELETE_TOPIC_IF_EXISTS);
+ if (!deleteTopicIfExists) {
+ return;
+ } else {
+ log.debug("Deleting topic {}", topicName);
+ AdminUtils.deleteTopic(zkUtils, topicName);
+ }
+ }
+ AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig, RackAwareMode.Disabled$.MODULE$);
+ log.info("Created topic {} with replication: {} and partitions : {}", topicName, replication, partitions);
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1JsonObjectWriterBuilder.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1JsonObjectWriterBuilder.java
new file mode 100644
index 0000000..4e4e6d0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1JsonObjectWriterBuilder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.kafka.serialize.GsonSerializerBase;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Properties;
+
+
+/**
+ * A {@link org.apache.gobblin.writer.DataWriterBuilder} that builds a {@link org.apache.gobblin.writer.DataWriter} to
+ * write {@link JsonObject} to kafka
+ */
+public class Kafka1JsonObjectWriterBuilder extends AbstractKafkaDataWriterBuilder<JsonArray, JsonObject> {
+ private static final String VALUE_SERIALIZER_KEY =
+ KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG;
+
+ @Override
+ protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props)
+ throws ConfigurationException {
+ props.setProperty(VALUE_SERIALIZER_KEY, KafkaGsonObjectSerializer.class.getName());
+ return new Kafka1DataWriter<>(props);
+ }
+
+ /**
+ * A specific {@link Serializer} that serializes {@link JsonObject} to byte array
+ */
+ public final static class KafkaGsonObjectSerializer extends GsonSerializerBase<JsonObject> implements Serializer<JsonObject> {
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
new file mode 100644
index 0000000..e8e7fa4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationException;
+import org.apache.gobblin.writer.AsyncDataWriter;
+
+import java.util.Properties;
+
+
+/**
+ * Builder that hands back a {@link Kafka1DataWriter}
+ */
+public class KafkaDataWriterBuilder<S, D> extends AbstractKafkaDataWriterBuilder<S, D> {
+ @Override
+ protected AsyncDataWriter<D> getAsyncDataWriter(Properties props)
+ throws ConfigurationException {
+ return new Kafka1DataWriter<>(props);
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
new file mode 100644
index 0000000..e04a14c
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Establishes a connection to a Kafka cluster and push keyed messages to a specified topic.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+@Slf4j
+public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
+ private final String topic;
+ private final KafkaProducer<K, V> producer;
+ private final Closer closer;
+
+ public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+ this.closer = Closer.create();
+
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.RETRIES_CONFIG, 3);
+ //To guarantee ordered delivery, the maximum in flight requests must be set to 1.
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+
+ // add the kafka scoped config. if any of the above are specified then they are overridden
+ if (kafkaConfig.isPresent()) {
+ props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+ }
+
+ this.producer = createProducer(props);
+ }
+
+ public KafkaKeyValueProducerPusher(String brokers, String topic) {
+ this(brokers, topic, Optional.absent());
+ }
+
+ /**
+ * Push all keyed messages to the Kafka topic.
+ *
+ * @param messages List of keyed messages to push to Kakfa.
+ */
+ public void pushMessages(List<Pair<K, V>> messages) {
+ for (Pair<K, V> message : messages) {
+ this.producer.send(new ProducerRecord<>(topic, message.getKey(), message.getValue()), (recordMetadata, e) -> {
+ if (e != null) {
+ log.error("Failed to send message to topic {} due to exception: ", topic, e);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+ //since close() only guarantees delivery of in-flight messages.
+ log.info("Flushing records from producer buffer");
+ this.producer.flush();
+ this.closer.close();
+ }
+
+ /**
+ * Create the Kafka producer.
+ */
+ protected KafkaProducer<K, V> createProducer(Properties props) {
+ return this.closer.register(new KafkaProducer<K, V>(props));
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
new file mode 100644
index 0000000..823b83e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+@Slf4j
+public class KafkaProducerPusher implements Pusher<byte[]> {
+
+ private final String topic;
+ private final KafkaProducer<String, byte[]> producer;
+ private final Closer closer;
+
+ public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
+ this.closer = Closer.create();
+
+ this.topic = topic;
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.RETRIES_CONFIG, 3);
+
+ // add the kafka scoped config. if any of the above are specified then they are overridden
+ if (kafkaConfig.isPresent()) {
+ props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
+ }
+
+ this.producer = createProducer(props);
+ }
+
+ public KafkaProducerPusher(String brokers, String topic) {
+ this(brokers, topic, Optional.absent());
+ }
+
+ /**
+ * Push all byte array messages to the Kafka topic.
+ *
+ * @param messages List of byte array messages to push to Kakfa.
+ */
+ public void pushMessages(List<byte[]> messages) {
+ for (byte[] message : messages) {
+ producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
+ if (e != null) {
+ log.error("Failed to send message to topic {} due to exception: ", topic, e);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ //Call flush() before invoking close() to ensure any buffered messages are immediately sent. This is required
+ //since close() only guarantees delivery of in-flight messages.
+ log.info("Flushing records from producer buffer");
+ this.producer.flush();
+ this.closer.close();
+ }
+
+ /**
+ * Create the Kafka producer.
+ */
+ protected KafkaProducer<String, byte[]> createProducer(Properties props) {
+ return this.closer.register(new KafkaProducer<String, byte[]>(props));
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
new file mode 100644
index 0000000..238e11e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaClusterTestBase extends KafkaTestBase {
+
+ int clusterCount;
+ EmbeddedZookeeper _zkServer;
+ String _zkConnectString;
+ ZkClient _zkClient;
+ List<KafkaServer> kafkaBrokerList = new ArrayList<KafkaServer>();
+ List<Integer> kafkaBrokerPortList = new ArrayList<Integer>();
+
+ public KafkaClusterTestBase(int clusterCount) throws InterruptedException, RuntimeException {
+ super();
+ this.clusterCount = clusterCount;
+ }
+
+ public void startCluster() {
+ // Start Zookeeper.
+ _zkServer = new EmbeddedZookeeper();
+ _zkConnectString = "127.0.0.1:" + _zkServer.port();
+ _zkClient = new ZkClient(_zkConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+ // Start Kafka Cluster.
+ for (int i = 0; i < clusterCount; i++) {
+ KafkaServer _kafkaServer = createKafkaServer(i, _zkConnectString);
+ kafkaBrokerList.add(_kafkaServer);
+ }
+ }
+
+ public void stopCluster() {
+ Iterator<KafkaServer> iter = kafkaBrokerList.iterator();
+ while (iter.hasNext()) {
+ KafkaServer server = iter.next();
+ try {
+ server.shutdown();
+ } catch (RuntimeException e) {
+ // Simply Ignore.
+ }
+ }
+ }
+
+ public int getZookeeperPort() {
+ return _zkServer.port();
+ }
+
+ public List<KafkaServer> getBrokerList() {
+ return kafkaBrokerList;
+ }
+
+ public List<Integer> getKafkaBrokerPortList() {
+ return kafkaBrokerPortList;
+ }
+
+
+ public int getClusterCount() {
+ return kafkaBrokerList.size();
+ }
+
+ private KafkaServer createKafkaServer(int brokerId, String _zkConnectString) {
+
+ int _brokerId = brokerId;
+ int _kafkaServerPort = TestUtils.findFreePort();
+ Properties props = kafka.utils.TestUtils.createBrokerConfig(
+ _brokerId,
+ _zkConnectString,
+ kafka.utils.TestUtils.createBrokerConfig$default$3(),
+ kafka.utils.TestUtils.createBrokerConfig$default$4(),
+ _kafkaServerPort,
+ kafka.utils.TestUtils.createBrokerConfig$default$6(),
+ kafka.utils.TestUtils.createBrokerConfig$default$7(),
+ kafka.utils.TestUtils.createBrokerConfig$default$8(),
+ kafka.utils.TestUtils.createBrokerConfig$default$9(),
+ kafka.utils.TestUtils.createBrokerConfig$default$10(),
+ kafka.utils.TestUtils.createBrokerConfig$default$11(),
+ kafka.utils.TestUtils.createBrokerConfig$default$12(),
+ kafka.utils.TestUtils.createBrokerConfig$default$13(),
+ kafka.utils.TestUtils.createBrokerConfig$default$14(),
+ kafka.utils.TestUtils.createBrokerConfig$default$15(),
+ kafka.utils.TestUtils.createBrokerConfig$default$16(),
+ kafka.utils.TestUtils.createBrokerConfig$default$17(),
+ kafka.utils.TestUtils.createBrokerConfig$default$18()
+ );
+ KafkaConfig config = new KafkaConfig(props);
+ Time mock = new MockTime();
+ KafkaServer _kafkaServer = kafka.utils.TestUtils.createServer(config, mock);
+ kafkaBrokerPortList.add(_kafkaServerPort);
+ return _kafkaServer;
+ }
+
+ public String getBootServersList() {
+ String bootServerString = "";
+ Iterator<Integer> ports = kafkaBrokerPortList.iterator();
+ while (ports.hasNext()) {
+ Integer port = ports.next();
+ bootServerString = bootServerString + "localhost:" + port + ",";
+ }
+ bootServerString = bootServerString.substring(0, bootServerString.length() - 1);
+ return bootServerString;
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..d83500d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/KafkaTestBase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import lombok.extern.slf4j.Slf4j;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * A private class for starting a suite of servers for Kafka
+ * Calls to start and shutdown are reference counted, so that the suite is started and shutdown in pairs.
+ * A suite of servers (Zk, Kafka etc) will be started just once per process
+ */
+@Slf4j
+class KafkaServerSuite {
+ static KafkaServerSuite _instance;
+ private final int _kafkaServerPort;
+ private final AtomicInteger _numStarted;
+ private int _brokerId = 0;
+ private EmbeddedZookeeper _zkServer;
+ private ZkClient _zkClient;
+ private KafkaServer _kafkaServer;
+ private String _zkConnectString;
+ private KafkaServerSuite() {
+ _kafkaServerPort = TestUtils.findFreePort();
+ _zkConnectString = "UNINITIALIZED_HOST_PORT";
+ _numStarted = new AtomicInteger(0);
+ }
+
+ static KafkaServerSuite getInstance() {
+ if (null == _instance) {
+ _instance = new KafkaServerSuite();
+ return _instance;
+ } else {
+ return _instance;
+ }
+ }
+
+ public ZkClient getZkClient() {
+ return _zkClient;
+ }
+
+ public KafkaServer getKafkaServer() {
+ return _kafkaServer;
+ }
+
+ public int getKafkaServerPort() {
+ return _kafkaServerPort;
+ }
+
+ public String getZkConnectString() {
+ return _zkConnectString;
+ }
+
+ void start()
+ throws RuntimeException {
+ if (_numStarted.incrementAndGet() == 1) {
+ log.warn("Starting up Kafka server suite. Zk at " + _zkConnectString + "; Kafka server at " + _kafkaServerPort);
+ _zkServer = new EmbeddedZookeeper();
+ _zkConnectString = "127.0.0.1:" + _zkServer.port();
+ _zkClient = new ZkClient(_zkConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+
+ Properties props = kafka.utils.TestUtils.createBrokerConfig(
+ _brokerId,
+ _zkConnectString,
+ kafka.utils.TestUtils.createBrokerConfig$default$3(),
+ kafka.utils.TestUtils.createBrokerConfig$default$4(),
+ _kafkaServerPort,
+ kafka.utils.TestUtils.createBrokerConfig$default$6(),
+ kafka.utils.TestUtils.createBrokerConfig$default$7(),
+ kafka.utils.TestUtils.createBrokerConfig$default$8(),
+ kafka.utils.TestUtils.createBrokerConfig$default$9(),
+ kafka.utils.TestUtils.createBrokerConfig$default$10(),
+ kafka.utils.TestUtils.createBrokerConfig$default$11(),
+ kafka.utils.TestUtils.createBrokerConfig$default$12(),
+ kafka.utils.TestUtils.createBrokerConfig$default$13(),
+ kafka.utils.TestUtils.createBrokerConfig$default$14(),
+ kafka.utils.TestUtils.createBrokerConfig$default$15(),
+ kafka.utils.TestUtils.createBrokerConfig$default$16(),
+ kafka.utils.TestUtils.createBrokerConfig$default$17(),
+ kafka.utils.TestUtils.createBrokerConfig$default$18()
+ );
+
+
+ KafkaConfig config = new KafkaConfig(props);
+ Time mock = new MockTime();
+ _kafkaServer = kafka.utils.TestUtils.createServer(config, mock);
+ } else {
+ log.info("Kafka server suite already started... continuing");
+ }
+ }
+
+
+ void shutdown() {
+ if (_numStarted.decrementAndGet() == 0) {
+ log.info("Shutting down Kafka server suite");
+ _kafkaServer.shutdown();
+ _zkClient.close();
+ _zkServer.shutdown();
+ } else {
+ log.info("Kafka server suite still in use ... not shutting down yet");
+ }
+ }
+
+}
+
+class KafkaConsumerSuite {
+ private final ConsumerConnector _consumer;
+ private final KafkaStream<byte[], byte[]> _stream;
+ private final ConsumerIterator<byte[], byte[]> _iterator;
+ private final String _topic;
+
+ KafkaConsumerSuite(String zkConnectString, String topic) {
+ _topic = topic;
+ Properties consumeProps = new Properties();
+ consumeProps.put("zookeeper.connect", zkConnectString);
+ consumeProps.put("group.id", _topic + "-" + System.nanoTime());
+ consumeProps.put("zookeeper.session.timeout.ms", "10000");
+ consumeProps.put("zookeeper.sync.time.ms", "10000");
+ consumeProps.put("auto.commit.interval.ms", "10000");
+ consumeProps.put("_consumer.timeout.ms", "10000");
+
+ _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
+
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+ _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
+ List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
+ _stream = streams.get(0);
+ _iterator = _stream.iterator();
+ }
+
+ void shutdown() {
+ _consumer.shutdown();
+ }
+
+ public ConsumerIterator<byte[], byte[]> getIterator() {
+ return _iterator;
+ }
+}
+
+/**
+ * A Helper class for testing against Kafka
+ * A suite of servers (Zk, Kafka etc) will be started just once per process
+ * Consumer and iterator will be created per instantiation and is one instance per topic.
+ */
+public class KafkaTestBase implements Closeable {
+
+ private final KafkaServerSuite _kafkaServerSuite;
+ private final Map<String, KafkaConsumerSuite> _topicConsumerMap;
+
+ public KafkaTestBase() throws InterruptedException, RuntimeException {
+
+ this._kafkaServerSuite = KafkaServerSuite.getInstance();
+ this._topicConsumerMap = new HashMap<>();
+ }
+
+ public synchronized void startServers() {
+ _kafkaServerSuite.start();
+ }
+
+ public void stopServers() {
+ _kafkaServerSuite.shutdown();
+ }
+
+ public void start() {
+ startServers();
+ }
+
+ public void stopClients() throws IOException {
+ for (Map.Entry<String, KafkaConsumerSuite> consumerSuiteEntry : _topicConsumerMap.entrySet()) {
+ consumerSuiteEntry.getValue().shutdown();
+ AdminUtils.deleteTopic(ZkUtils.apply(_kafkaServerSuite.getZkClient(), false),
+ consumerSuiteEntry.getKey());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ stopClients();
+ stopServers();
+ }
+
+ public void provisionTopic(String topic) {
+ if (_topicConsumerMap.containsKey(topic)) {
+ // nothing to do: return
+ } else {
+ // provision topic
+ AdminUtils.createTopic(ZkUtils.apply(_kafkaServerSuite.getZkClient(), false),
+ topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+ List<KafkaServer> servers = new ArrayList<>();
+ servers.add(_kafkaServerSuite.getKafkaServer());
+ kafka.utils.TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
+ KafkaConsumerSuite consumerSuite = new KafkaConsumerSuite(_kafkaServerSuite.getZkConnectString(), topic);
+ _topicConsumerMap.put(topic, consumerSuite);
+ }
+ }
+
+
+ public ConsumerIterator<byte[], byte[]> getIteratorForTopic(String topic) {
+ if (_topicConsumerMap.containsKey(topic)) {
+ return _topicConsumerMap.get(topic).getIterator();
+ } else {
+ throw new IllegalStateException("Could not find provisioned topic" + topic + ": call provisionTopic before");
+ }
+ }
+
+ public int getKafkaServerPort() {
+ return _kafkaServerSuite.getKafkaServerPort();
+ }
+
+ public String getZkConnectString() {
+ return this._kafkaServerSuite.getZkConnectString();
+ }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
new file mode 100644
index 0000000..6587ff7
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/client/Kafka1ConsumerClientTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.kafka.client;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Set;
+
+
+public class Kafka1ConsumerClientTest {
+
+ @Test
+ public void testConsume() throws Exception {
+ Config testConfig = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.KAFKA_BROKERS, "test"));
+ MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.NONE);
+ consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0)));
+
+ HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
+ consumer.updateBeginningOffsets(beginningOffsets);
+
+ ConsumerRecord<String, String> record0 = new ConsumerRecord<>("test_topic", 0, 0L, "key", "value0");
+ ConsumerRecord<String, String> record1 = new ConsumerRecord<>("test_topic", 0, 1L, "key", "value1");
+ ConsumerRecord<String, String> record2 = new ConsumerRecord<>("test_topic", 0, 2L, "key", "value2");
+
+ consumer.addRecord(record0);
+ consumer.addRecord(record1);
+ consumer.addRecord(record2);
+
+ try (Kafka1ConsumerClient<String, String> kafka1Client = new Kafka1ConsumerClient<>(testConfig, consumer);) {
+
+ // Consume from 0 offset
+ Set<KafkaConsumerRecord> consumedRecords =
+ Sets.newHashSet(kafka1Client.consume(new KafkaPartition.Builder().withId(0).withTopicName("test_topic")
+ .build(), 0l, 100l));
+
+ Set<Kafka1ConsumerClient.Kafka1ConsumerRecord<String, String>> expected =
+ ImmutableSet.of(new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record0),
+ new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record1), new Kafka1ConsumerClient.Kafka1ConsumerRecord<>(record2));
+ Assert.assertEquals(consumedRecords, expected);
+
+ }
+
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
new file mode 100644
index 0000000..cda0842
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class ByPassWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+ }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1DataWriterTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1DataWriterTest.java
new file mode 100644
index 0000000..8a6a7a0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1DataWriterTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import kafka.message.MessageAndMetadata;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.kafka.schemareg.SchemaRegistryException;
+import org.apache.gobblin.kafka.serialize.LiAvroDeserializer;
+import org.apache.gobblin.kafka.serialize.LiAvroSerializer;
+import org.apache.gobblin.kafka.serialize.SerializationException;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.*;
+
+
+@Slf4j
+public class Kafka1DataWriterTest {
+
+
+ private final KafkaTestBase _kafkaTestHelper;
+
+ public Kafka1DataWriterTest()
+ throws InterruptedException, RuntimeException {
+ _kafkaTestHelper = new KafkaTestBase();
+ }
+
+ @BeforeSuite(alwaysRun = true)
+ public void beforeSuite() {
+ log.warn("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+
+ _kafkaTestHelper.startServers();
+ }
+
+ @AfterSuite(alwaysRun = true)
+ public void afterSuite()
+ throws IOException {
+ try {
+ _kafkaTestHelper.stopClients();
+ } finally {
+ _kafkaTestHelper.stopServers();
+ }
+ }
+
+ @Test
+ public void testStringSerialization()
+ throws IOException, InterruptedException, ExecutionException {
+ String topic = "testStringSerialization1";
+ _kafkaTestHelper.provisionTopic(topic);
+ Properties props = new Properties();
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ Kafka1DataWriter<String, String> kafka1DataWriter = new Kafka1DataWriter<>(props);
+ String messageString = "foobar";
+ WriteCallback callback = mock(WriteCallback.class);
+ Future<WriteResponse> future;
+
+ try {
+ future = kafka1DataWriter.write(messageString, callback);
+ kafka1DataWriter.flush();
+ verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+ verify(callback, never()).onFailure(isA(Exception.class));
+ Assert.assertTrue(future.isDone(), "Future should be done");
+ System.out.println(future.get().getStringResponse());
+ byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
+ String messageReceived = new String(message);
+ Assert.assertEquals(messageReceived, messageString);
+ } finally {
+ kafka1DataWriter.close();
+ }
+
+
+ }
+
+ @Test
+ public void testBinarySerialization()
+ throws IOException {
+ String topic = "testBinarySerialization1";
+ _kafkaTestHelper.provisionTopic(topic);
+ Properties props = new Properties();
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ Kafka1DataWriter<String, byte[]> kafka1DataWriter = new Kafka1DataWriter<>(props);
+ WriteCallback callback = mock(WriteCallback.class);
+ byte[] messageBytes = TestUtils.generateRandomBytes();
+
+ try {
+ kafka1DataWriter.write(messageBytes, callback);
+ } finally {
+ kafka1DataWriter.close();
+ }
+
+ verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+ verify(callback, never()).onFailure(isA(Exception.class));
+ byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
+ Assert.assertEquals(message, messageBytes);
+ }
+
+ @Test
+ public void testAvroSerialization()
+ throws IOException, SchemaRegistryException {
+ String topic = "testAvroSerialization1";
+ _kafkaTestHelper.provisionTopic(topic);
+ Properties props = new Properties();
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
+ "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
+ LiAvroSerializer.class.getName());
+
+ // set up mock schema registry
+
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+ ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+ Kafka1DataWriter<String, GenericRecord> kafka1DataWriter = new Kafka1DataWriter<>(props);
+ WriteCallback callback = mock(WriteCallback.class);
+
+ GenericRecord record = TestUtils.generateRandomAvroRecord();
+ try {
+ kafka1DataWriter.write(record, callback);
+ } finally {
+ kafka1DataWriter.close();
+ }
+
+ log.info("Kafka events written");
+
+ verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+ verify(callback, never()).onFailure(isA(Exception.class));
+
+ byte[] message = _kafkaTestHelper.getIteratorForTopic(topic).next().message();
+
+ log.info("Kafka events read, start to check result... ");
+ ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
+ LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
+ GenericRecord receivedRecord = deser.deserialize(topic, message);
+ Assert.assertEquals(record.toString(), receivedRecord.toString());
+ }
+
+
+ @Test
+ public void testKeyedAvroSerialization()
+ throws IOException, SchemaRegistryException, SerializationException {
+ String topic = "testAvroSerialization1";
+ _kafkaTestHelper.provisionTopic(topic);
+ Properties props = new Properties();
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
+ "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
+ LiAvroSerializer.class.getName());
+ props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");
+ String keyField = "field1";
+ props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, keyField);
+
+
+ // set up mock schema registry
+
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+ ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+ Kafka1DataWriter<String, GenericRecord> kafka1DataWriter = new Kafka1DataWriter<>(props);
+ WriteCallback callback = mock(WriteCallback.class);
+
+ GenericRecord record = TestUtils.generateRandomAvroRecord();
+ try {
+ kafka1DataWriter.write(record, callback);
+ } finally {
+ kafka1DataWriter.close();
+ }
+
+ verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+ verify(callback, never()).onFailure(isA(Exception.class));
+ MessageAndMetadata<byte[], byte[]> value = _kafkaTestHelper.getIteratorForTopic(topic).next();
+ byte[] key = value.key();
+ byte[] message = value.message();
+ ConfigDrivenMd5SchemaRegistry schemaReg = new ConfigDrivenMd5SchemaRegistry(topic, record.getSchema());
+ LiAvroDeserializer deser = new LiAvroDeserializer(schemaReg);
+ GenericRecord receivedRecord = deser.deserialize(topic, message);
+ Assert.assertEquals(record.toString(), receivedRecord.toString());
+ Assert.assertEquals(new String(key), record.get(keyField));
+ }
+
+ @Test
+ public void testValueSerialization()
+ throws IOException, InterruptedException, SchemaRegistryException {
+ String topic = "testAvroSerialization1";
+ _kafkaTestHelper.provisionTopic(topic);
+ Properties props = new Properties();
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
+ "127.0.0.1:" + _kafkaTestHelper.getKafkaServerPort());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYED_CONFIG, "true");
+ String keyField = "field1";
+ props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_KEYFIELD_CONFIG, keyField);
+ props.setProperty(KafkaWriterConfigurationKeys.WRITER_KAFKA_VALUEFIELD_CONFIG, keyField);
+
+
+ // set up mock schema registry
+
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_CLASS,
+ ConfigDrivenMd5SchemaRegistry.class.getCanonicalName());
+
+ Kafka1DataWriter<String, GenericRecord> kafka1DataWriter = new Kafka1DataWriter<>(props);
+ WriteCallback callback = mock(WriteCallback.class);
+
+ GenericRecord record = TestUtils.generateRandomAvroRecord();
+ try {
+ kafka1DataWriter.write(record, callback);
+ } finally {
+ kafka1DataWriter.close();
+ }
+
+ verify(callback, times(1)).onSuccess(isA(WriteResponse.class));
+ verify(callback, never()).onFailure(isA(Exception.class));
+ MessageAndMetadata<byte[], byte[]> value = _kafkaTestHelper.getIteratorForTopic(topic).next();
+ byte[] key = value.key();
+ byte[] message = value.message();
+ Assert.assertEquals(new String(message), record.get(keyField));
+ Assert.assertEquals(new String(key), record.get(keyField));
+ }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1TopicProvisionTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1TopicProvisionTest.java
new file mode 100644
index 0000000..0117282
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/kafka/writer/Kafka1TopicProvisionTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.kafka.KafkaClusterTestBase;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.json.JSONObject;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+
+@Slf4j
+public class Kafka1TopicProvisionTest {
+
+ private final KafkaClusterTestBase _kafkaTestHelper;
+ private int testClusterCount = 5;
+
+ public Kafka1TopicProvisionTest()
+ throws InterruptedException, RuntimeException {
+ _kafkaTestHelper = new KafkaClusterTestBase(testClusterCount);
+ }
+
+ @BeforeSuite(alwaysRun = true)
+ public void beforeSuite() {
+ log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+ _kafkaTestHelper.startCluster();
+ }
+
+ @AfterSuite(alwaysRun = true)
+ public void afterSuite()
+ throws IOException {
+ _kafkaTestHelper.stopCluster();
+ }
+
+ @Test(enabled = false)
+ public void testCluster()
+ throws IOException, InterruptedException, KeeperException {
+ int clusterCount = _kafkaTestHelper.getClusterCount();
+ Assert.assertEquals(clusterCount, testClusterCount);
+ int zkPort = _kafkaTestHelper.getZookeeperPort();
+ String kafkaBrokerPortList = _kafkaTestHelper.getKafkaBrokerPortList().toString();
+ System.out.println("kafkaBrokerPortList : " + kafkaBrokerPortList);
+ ZooKeeper zk = new ZooKeeper("localhost:" + zkPort, 11000, new ByPassWatcher());
+ List<Integer> brokerPortList = new ArrayList<Integer>();
+ List<String> ids = zk.getChildren("/brokers/ids", false);
+ for (String id : ids) {
+ String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
+ JSONObject obj = new JSONObject(brokerInfo);
+ int brokerPort = obj.getInt("port");
+ System.out.println(brokerPort);
+ brokerPortList.add(brokerPort);
+ }
+ Assert.assertTrue(_kafkaTestHelper.getKafkaBrokerPortList().equals(brokerPortList));
+ }
+
+ @Test(enabled = false)
+ public void testTopicPartitionCreationCount()
+ throws IOException, InterruptedException, ExecutionException {
+ String topic = "topicPartition4";
+ int clusterCount = _kafkaTestHelper.getClusterCount();
+ int partionCount = clusterCount / 2;
+ int zkPort = _kafkaTestHelper.getZookeeperPort();
+ Properties props = new Properties();
+
+ // Setting Topic Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, String.valueOf(clusterCount));
+ props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(partionCount));
+ props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, "localhost:" + zkPort);
+
+ // Setting Producer Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaTestHelper.getBootServersList());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ Kafka1DataWriter<String, String> kafka1DataWriter = new Kafka1DataWriter<>(props);
+
+ String zookeeperConnect = "localhost:" + _kafkaTestHelper.getZookeeperPort();
+ int sessionTimeoutMs = 10 * 1000;
+ int connectionTimeoutMs = 8 * 1000;
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(
+ zookeeperConnect,
+ sessionTimeoutMs,
+ connectionTimeoutMs,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecureKafkaCluster = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
+
+ Integer partitionCount = (Integer) zkUtils.getTopicPartitionCount(topic).get();
+ Assert.assertEquals(partitionCount.intValue(), partionCount);
+
+ }
+
+ @Test(enabled = false)
+ public void testLiveTopicPartitionCreationCount()
+ throws IOException, InterruptedException, ExecutionException {
+ String liveClusterCount = System.getProperty("live.cluster.count");
+ String liveZookeeper = System.getProperty("live.zookeeper");
+ String liveBroker = System.getProperty("live.broker");
+ String topic = System.getProperty("live.newtopic");
+ String topicReplicationCount = System.getProperty("live.newtopic.replicationCount");
+ String topicPartitionCount = System.getProperty("live.newtopic.partitionCount");
+ if (StringUtils.isEmpty(liveClusterCount)) {
+ Assert.assertTrue(true);
+ return;
+ }
+ if (StringUtils.isEmpty(topicPartitionCount)) {
+ int clusterCount = Integer.parseInt(liveClusterCount);
+ clusterCount--;
+ int partionCount = clusterCount / 2;
+ topicReplicationCount = String.valueOf(clusterCount);
+ topicPartitionCount = String.valueOf(partionCount);
+ }
+
+ Properties props = new Properties();
+ // Setting Topic Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount);
+ props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount);
+ props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper);
+ // Setting Producer Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", liveBroker);
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ Kafka1DataWriter<String, String> kafka1DataWriter = new Kafka1DataWriter<>(props);
+ int sessionTimeoutMs = 10 * 1000;
+ int connectionTimeoutMs = 8 * 1000;
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(
+ liveZookeeper,
+ sessionTimeoutMs,
+ connectionTimeoutMs,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecureKafkaCluster = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster);
+
+ Properties config = new Properties();
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaTestHelper.getBootServersList());
+ AdminClient adminClient = AdminClient.create(config);
+ DescribeTopicsResult describer = adminClient.describeTopics(Collections.singletonList(topic));
+
+ // Note: AdminUtils.fetchTopicMetadataFromZk is deprecated after 0.10.0. Please consider using AdminClient
+ // to fetch topic config, or using ZKUtils.
+ Assert.assertEquals(describer.values().get(topic).get().partitions().size(), Integer.parseInt(topicPartitionCount));
+
+ }
+
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
new file mode 100644
index 0000000..07b1905
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+
+/**
+ * Test {@link KafkaKeyValueProducerPusher}.
+ */
+public class KafkaKeyValueProducerPusherTest {
+ public static final String TOPIC = KafkaKeyValueProducerPusherTest.class.getSimpleName();
+
+ private KafkaTestBase kafkaTestHelper;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ kafkaTestHelper = new KafkaTestBase();
+ kafkaTestHelper.startServers();
+
+ kafkaTestHelper.provisionTopic(TOPIC);
+ }
+
+ @Test
+ public void test() throws IOException {
+ // Test that the scoped config overrides the generic config
+ Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("127.0.0.1:dummy", TOPIC,
+ Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+ String msg1 = "msg1";
+ String msg2 = "msg2";
+
+ pusher.pushMessages(Lists.newArrayList(Pair.of("key1", msg1.getBytes()), Pair.of("key2", msg2.getBytes())));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+ assert (iterator.hasNext());
+
+ MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+
+ Assert.assertEquals(new String(messageAndMetadata.key()), "key1");
+ Assert.assertEquals(new String(messageAndMetadata.message()), msg1);
+ assert (iterator.hasNext());
+
+ messageAndMetadata = iterator.next();
+ Assert.assertEquals(new String(messageAndMetadata.key()), "key2");
+ Assert.assertEquals(new String(messageAndMetadata.message()), msg2);
+
+ pusher.close();
+ }
+
+ @AfterClass
+ public void after() {
+ try {
+ this.kafkaTestHelper.close();
+ } catch (Exception e) {
+ System.err.println("Failed to close Kafka server.");
+ }
+ }
+}
diff --git a/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
new file mode 100644
index 0000000..c369cdc
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-1/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import kafka.consumer.ConsumerIterator;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+
+/**
+ * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
+ */
+public class KafkaProducerPusherTest {
+ public static final String TOPIC = KafkaProducerPusherTest.class.getSimpleName();
+
+ private KafkaTestBase kafkaTestHelper;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ kafkaTestHelper = new KafkaTestBase();
+ kafkaTestHelper.startServers();
+
+ kafkaTestHelper.provisionTopic(TOPIC);
+ }
+
+ @Test
+ public void test() throws IOException {
+ // Test that the scoped config overrides the generic config
+ Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+ String msg1 = "msg1";
+ String msg2 = "msg2";
+
+ pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+ assert (iterator.hasNext());
+ Assert.assertEquals(new String(iterator.next().message()), msg1);
+ assert (iterator.hasNext());
+ Assert.assertEquals(new String(iterator.next().message()), msg2);
+
+ pusher.close();
+ }
+
+ @AfterClass
+ public void after() {
+ try {
+ this.kafkaTestHelper.close();
+ } catch (Exception e) {
+ System.err.println("Failed to close Kafka server.");
+ }
+ }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index 8b1ae17..d58fb2a 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -35,6 +35,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
"Java languange compatibility; supported versions: " + JavaVersion.VERSION_1_8))
.register(new BuildProperty("kafka08Version", "0.8.2.2", "Kafka 0.8 dependencies version"))
.register(new BuildProperty("kafka09Version", "0.9.0.1", "Kafka 0.9 dependencies version"))
+ .register(new BuildProperty("kafka1Version", "1.1.1", "Kafka 1.1 dependencies version"))
.register(new BuildProperty("pegasusVersion", "29.6.4", "Pegasus dependencies version"))
.register(new BuildProperty("publishToMaven", false, "Enable publishing of artifacts to a central Maven repository"))
.register(new BuildProperty("publishToNexus", false, "Enable publishing of artifacts to Nexus"))
@@ -67,6 +68,7 @@ BUILD_PROPERTIES.ensureDefined('hadoopVersion')
BUILD_PROPERTIES.ensureDefined('hiveVersion')
BUILD_PROPERTIES.ensureDefined('kafka08Version')
BUILD_PROPERTIES.ensureDefined('kafka09Version')
+BUILD_PROPERTIES.ensureDefined('kafka1Version')
BUILD_PROPERTIES.ensureDefined('pegasusVersion')
BUILD_PROPERTIES.ensureDefined('salesforceVersion')
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 5f11123..99fe31f 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -85,6 +85,10 @@ ext.externalDependency = [
"kafka09": "org.apache.kafka:kafka_2.11:" + kafka09Version,
"kafka09Test": "org.apache.kafka:kafka_2.11:" + kafka09Version + ":test",
"kafka09Client": "org.apache.kafka:kafka-clients:" + kafka09Version,
+ "kafka1": "org.apache.kafka:kafka_2.11:" + kafka1Version,
+ "kafka1Test": "org.apache.kafka:kafka_2.11:" + kafka1Version + ":test",
+ "kafka1Client": "org.apache.kafka:kafka-clients:" + kafka1Version,
+ "kafka1ClientTest": "org.apache.kafka:kafka-clients:" + kafka1Version + ":test",
"confluentSchemaRegistryClient": "io.confluent:kafka-schema-registry-client:" + confluentVersion,
"confluentAvroSerializer": "io.confluent:kafka-avro-serializer:" + confluentVersion,
"confluentJsonSerializer": "io.confluent:kafka-json-serializer:" + confluentVersion,