You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/11/01 22:00:59 UTC
incubator-gobblin git commit: [Gobblin 190][GOBBLIN-190] Kafka Sink
replication factor and partition creation.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master a871e5c5d -> 603f22de0
[Gobblin 190][GOBBLIN-190] Kafka Sink replication factor and partition creation.
Closes #2126 from dallaybatta/GOBBLIN-190
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/603f22de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/603f22de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/603f22de
Branch: refs/heads/master
Commit: 603f22de008737d290a9ed18fb6a76544df119a5
Parents: a871e5c
Author: vickykak@gmail.com <vi...@gmail.com>
Authored: Wed Nov 1 15:00:54 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Nov 1 15:00:54 2017 -0700
----------------------------------------------------------------------
gobblin-modules/gobblin-kafka-09/build.gradle | 8 +-
.../gobblin/kafka/writer/Kafka09DataWriter.java | 42 ++++-
.../gobblin/kafka/KafkaClusterTestBase.java | 128 +++++++++++++
.../gobblin/kafka/writer/ByPassWatcher.java | 30 +++
.../kafka/writer/Kafka09TopicProvisionTest.java | 184 +++++++++++++++++++
.../writer/KafkaWriterConfigurationKeys.java | 11 ++
6 files changed, 401 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle b/gobblin-modules/gobblin-kafka-09/build.gradle
index 44a5b80..54ba448 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -72,7 +72,13 @@ configurations {
}
test {
- workingDir rootProject.rootDir
+ workingDir rootProject.rootDir
+ systemProperty "live.newtopic", System.getProperty("live.newtopic")
+ systemProperty "live.newtopic.replicationCount", System.getProperty("live.newtopic.replicationCount")
+ systemProperty "live.newtopic.partitionCount", System.getProperty("live.newtopic.partitionCount")
+ systemProperty "live.cluster.count", System.getProperty("live.cluster.count")
+ systemProperty "live.zookeeper", System.getProperty("live.zookeeper")
+ systemProperty "live.broker", System.getProperty("live.broker")
}
ext.classification="library"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 2cb00e1..89b637a 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -31,8 +31,15 @@ import com.google.common.base.Throwables;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import kafka.utils.ZkUtils;
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
import lombok.extern.slf4j.Slf4j;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
@@ -49,6 +56,7 @@ import org.apache.gobblin.writer.WriteResponseMapper;
@Slf4j
public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
+
private static final WriteResponseMapper<RecordMetadata> WRITE_RESPONSE_WRAPPER =
new WriteResponseMapper<RecordMetadata>() {
@@ -94,6 +102,7 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
public Kafka09DataWriter(Producer producer, Config config) {
this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
+ provisionTopic(topic,config);
this.producer = producer;
}
@@ -121,6 +130,37 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
@Override
public void flush()
throws IOException {
- this.producer.flush();
+ this.producer.flush();
}
+
+ private void provisionTopic(String topicName,Config config) {
+ String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
+ if(!config.hasPath(zooKeeperPropKey)) {
+ log.debug("Topic "+topicName+" is configured without the partition and replication");
+ return;
+ }
+ String zookeeperConnect = config.getString(zooKeeperPropKey);
+ int sessionTimeoutMs = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT, KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ int connectionTimeoutMs = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT, KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
+ // Security for Kafka was added in Kafka 0.9.0.0
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), false);
+ int partitions = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.PARTITION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+ int replication = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.REPLICATION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+ Properties topicConfig = new Properties();
+ if(AdminUtils.topicExists(zkUtils, topicName)) {
+ log.debug("Topic"+topicName+" already Exists with replication: "+replication+" and partitions :"+partitions);
+ return;
+ }
+ try {
+ AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig);
+ } catch (RuntimeException e) {
+ throw new RuntimeException(e);
+ }
+ log.info("Created Topic "+topicName+" with replication: "+replication+" and partitions :"+partitions);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
new file mode 100644
index 0000000..8cbe983
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+
+import org.apache.gobblin.test.TestUtils;
+
+public class KafkaClusterTestBase extends KafkaTestBase {
+
+ int clusterCount;
+ EmbeddedZookeeper _zkServer;
+ String _zkConnectString;
+ ZkClient _zkClient;
+ List<KafkaServer> kafkaBrokerList = new ArrayList<KafkaServer>();
+ List<Integer> kafkaBrokerPortList = new ArrayList<Integer>();
+
+ public KafkaClusterTestBase(int clusterCount) throws InterruptedException, RuntimeException {
+ super();
+ this.clusterCount = clusterCount;
+ }
+
+ public void startCluster() {
+ // Start Zookeeper.
+ _zkServer = new EmbeddedZookeeper();
+ _zkConnectString = "127.0.0.1:"+_zkServer.port();
+ _zkClient = new ZkClient(_zkConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+ // Start Kafka Cluster.
+ for(int i=0;i<clusterCount;i++) {
+ KafkaServer _kafkaServer = createKafkaServer(i,_zkConnectString);
+ kafkaBrokerList.add(_kafkaServer);
+ }
+ }
+
+ public void stopCluster() {
+ Iterator<KafkaServer> iter = kafkaBrokerList.iterator();
+ while(iter.hasNext()){
+ KafkaServer server = iter.next();
+ try {
+ server.shutdown();
+ } catch (RuntimeException e) {
+ // Simply Ignore.
+ }
+ }
+ }
+
+ public int getZookeeperPort() {
+ return _zkServer.port();
+ }
+
+ public List<KafkaServer> getBrokerList() {
+ return kafkaBrokerList;
+ }
+
+ public List<Integer> getKafkaBrokerPortList() {
+ return kafkaBrokerPortList;
+ }
+
+
+ public int getClusterCount() {
+ return kafkaBrokerList.size();
+ }
+
+ private KafkaServer createKafkaServer(int brokerId,String _zkConnectString){
+
+ int _brokerId = brokerId;
+ int _kafkaServerPort = TestUtils.findFreePort();
+ Properties props = kafka.utils.TestUtils.createBrokerConfig(
+ _brokerId,
+ _zkConnectString,
+ kafka.utils.TestUtils.createBrokerConfig$default$3(),
+ kafka.utils.TestUtils.createBrokerConfig$default$4(),
+ _kafkaServerPort,
+ kafka.utils.TestUtils.createBrokerConfig$default$6(),
+ kafka.utils.TestUtils.createBrokerConfig$default$7(),
+ kafka.utils.TestUtils.createBrokerConfig$default$8(),
+ kafka.utils.TestUtils.createBrokerConfig$default$9(),
+ kafka.utils.TestUtils.createBrokerConfig$default$10(),
+ kafka.utils.TestUtils.createBrokerConfig$default$11(),
+ kafka.utils.TestUtils.createBrokerConfig$default$12(),
+ kafka.utils.TestUtils.createBrokerConfig$default$13(),
+ kafka.utils.TestUtils.createBrokerConfig$default$14()
+ );
+ KafkaConfig config = new KafkaConfig(props);
+ Time mock = new MockTime();
+ KafkaServer _kafkaServer = kafka.utils.TestUtils.createServer(config, mock);
+ kafkaBrokerPortList.add(_kafkaServerPort);
+ return _kafkaServer;
+ }
+
+ public String getBootServersList() {
+ String bootServerString = "";
+ Iterator<Integer> ports = kafkaBrokerPortList.iterator();
+ while(ports.hasNext()){
+ Integer port = ports.next();
+ bootServerString = bootServerString+"localhost:"+port+",";
+ }
+ bootServerString = bootServerString.substring(0,bootServerString.length()-1);
+ return bootServerString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
new file mode 100644
index 0000000..0c59030
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class ByPassWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
new file mode 100644
index 0000000..d8b7ba0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.json.JSONObject;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.kafka.KafkaClusterTestBase;
+import org.apache.commons.lang3.StringUtils;
+import kafka.admin.AdminUtils;
+import kafka.api.TopicMetadata;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+
+@Slf4j
+public class Kafka09TopicProvisionTest {
+
+
+ private final KafkaClusterTestBase _kafkaTestHelper;
+ private int testClusterCount = 5;
+
+ public Kafka09TopicProvisionTest()
+ throws InterruptedException, RuntimeException {
+ _kafkaTestHelper = new KafkaClusterTestBase(testClusterCount);
+ }
+
+ @BeforeSuite
+ public void beforeSuite() {
+ log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+ _kafkaTestHelper.startCluster();
+ }
+
+ @AfterSuite
+ public void afterSuite()
+ throws IOException {
+ _kafkaTestHelper.stopCluster();
+ }
+
+ @Test
+ public void testCluster()
+ throws IOException, InterruptedException, KeeperException {
+ int clusterCount = _kafkaTestHelper.getClusterCount();
+ Assert.assertEquals(clusterCount,testClusterCount);
+ int zkPort = _kafkaTestHelper.getZookeeperPort();
+ String kafkaBrokerPortList = _kafkaTestHelper.getKafkaBrokerPortList().toString();
+ System.out.println("kafkaBrokerPortList : " + kafkaBrokerPortList);
+ ZooKeeper zk = new ZooKeeper("localhost:"+zkPort, 10000, new ByPassWatcher());
+ List<Integer> brokerPortList = new ArrayList<Integer>();
+ List<String> ids = zk.getChildren("/brokers/ids", false);
+ for (String id : ids) {
+ String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
+ JSONObject obj = new JSONObject(brokerInfo);
+ int brokerPort = obj.getInt("port");
+ System.out.println(brokerPort);
+ brokerPortList.add(brokerPort);
+ }
+ Assert.assertTrue(_kafkaTestHelper.getKafkaBrokerPortList().equals(brokerPortList));
+ }
+
+ @Test
+ public void testTopicPartitionCreationCount()
+ throws IOException, InterruptedException {
+ String topic = "topicPartition4";
+ int clusterCount = _kafkaTestHelper.getClusterCount();
+ int partionCount = clusterCount/2;
+ int zkPort = _kafkaTestHelper.getZookeeperPort();
+ Properties props = new Properties();
+
+ // Setting Topic Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, String.valueOf(clusterCount));
+ props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(partionCount));
+ props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, "localhost:"+zkPort);
+ System.out.println(_kafkaTestHelper.getBootServersList());
+
+ // Setting Producer Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", _kafkaTestHelper.getBootServersList());
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
+ String zookeeperConnect = "localhost:"+_kafkaTestHelper.getZookeeperPort();
+ int sessionTimeoutMs = 10 * 1000;
+ int connectionTimeoutMs = 8 * 1000;
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(
+ zookeeperConnect,
+ sessionTimeoutMs,
+ connectionTimeoutMs,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecureKafkaCluster = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
+
+ TopicMetadata metaData =
+ AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
+ Assert.assertEquals(metaData.partitionsMetadata().size(), partionCount);
+
+ }
+
+ @Test
+ public void testLiveTopicPartitionCreationCount()
+ throws IOException, InterruptedException {
+ String liveClusterCount = System.getProperty("live.cluster.count");
+ String liveZookeeper = System.getProperty("live.zookeeper");
+ String liveBroker = System.getProperty("live.broker");
+ String topic = System.getProperty("live.newtopic");
+ String topicReplicationCount = System.getProperty("live.newtopic.replicationCount");
+ String topicPartitionCount = System.getProperty("live.newtopic.partitionCount");
+ if(StringUtils.isEmpty(liveClusterCount)){
+ Assert.assertTrue(true);
+ return;
+ }
+ if(StringUtils.isEmpty(topicPartitionCount)){
+ int clusterCount = Integer.parseInt(liveClusterCount);
+ clusterCount--;
+ int partionCount = clusterCount/2;
+ topicReplicationCount = String.valueOf(clusterCount);
+ topicPartitionCount = String.valueOf(partionCount);
+ }
+
+ Properties props = new Properties();
+ // Setting Topic Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+ props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount);
+ props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount );
+ props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper);
+ // Setting Producer Properties
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", liveBroker);
+ props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
+ int sessionTimeoutMs = 10 * 1000;
+ int connectionTimeoutMs = 8 * 1000;
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(
+ liveZookeeper,
+ sessionTimeoutMs,
+ connectionTimeoutMs,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecureKafkaCluster = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster);
+
+ TopicMetadata metaData =
+ AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
+ Assert.assertEquals(metaData.partitionsMetadata().size(), Integer.parseInt(topicPartitionCount));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index f6776c0..279812e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -50,4 +50,15 @@ public class KafkaWriterConfigurationKeys {
static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = "kafka.schemaRegistry.switchName";
static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = "true";
+ public static final String KAFKA_TOPIC_CONFIG = "writer.kafka.";
+ static final String TOPIC_NAME = "topic";
+ public static final String CLUSTER_ZOOKEEPER = KAFKA_TOPIC_CONFIG + "zookeeper";
+ static final String REPLICATION_COUNT = KAFKA_TOPIC_CONFIG + "replicationCount";
+ static final int REPLICATION_COUNT_DEFAULT = 1;
+ static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount";
+ static final int PARTITION_COUNT_DEFAULT = 1;
+ public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER + ".sto";
+ static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds
+ public static final String ZOOKEEPER_CONNECTION_TIMEOUT = CLUSTER_ZOOKEEPER + ".cto";
+ static final int ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT = 8000; // 8 seconds
}