You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/11/01 22:00:59 UTC

incubator-gobblin git commit: [Gobblin 190][GOBBLIN-190] Kafka Sink replication factor and partition creation.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a871e5c5d -> 603f22de0


[Gobblin 190][GOBBLIN-190] Kafka Sink replication factor and partition creation.

Closes #2126 from dallaybatta/GOBBLIN-190


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/603f22de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/603f22de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/603f22de

Branch: refs/heads/master
Commit: 603f22de008737d290a9ed18fb6a76544df119a5
Parents: a871e5c
Author: vickykak@gmail.com <vi...@gmail.com>
Authored: Wed Nov 1 15:00:54 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Wed Nov 1 15:00:54 2017 -0700

----------------------------------------------------------------------
 gobblin-modules/gobblin-kafka-09/build.gradle   |   8 +-
 .../gobblin/kafka/writer/Kafka09DataWriter.java |  42 ++++-
 .../gobblin/kafka/KafkaClusterTestBase.java     | 128 +++++++++++++
 .../gobblin/kafka/writer/ByPassWatcher.java     |  30 +++
 .../kafka/writer/Kafka09TopicProvisionTest.java | 184 +++++++++++++++++++
 .../writer/KafkaWriterConfigurationKeys.java    |  11 ++
 6 files changed, 401 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle b/gobblin-modules/gobblin-kafka-09/build.gradle
index 44a5b80..54ba448 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -72,7 +72,13 @@ configurations {
 }
 
 test {
-  workingDir rootProject.rootDir
+  workingDir rootProject.rootDir  
+  systemProperty "live.newtopic", System.getProperty("live.newtopic")
+  systemProperty "live.newtopic.replicationCount", System.getProperty("live.newtopic.replicationCount")
+  systemProperty "live.newtopic.partitionCount", System.getProperty("live.newtopic.partitionCount")
+  systemProperty "live.cluster.count", System.getProperty("live.cluster.count")
+  systemProperty "live.zookeeper", System.getProperty("live.zookeeper")
+  systemProperty "live.broker", System.getProperty("live.broker")
 }
 
 ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 2cb00e1..89b637a 100644
--- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -31,8 +31,15 @@ import com.google.common.base.Throwables;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import kafka.utils.ZkUtils;
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
 import lombok.extern.slf4j.Slf4j;
 
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
 import org.apache.gobblin.writer.WriteResponse;
@@ -49,6 +56,7 @@ import org.apache.gobblin.writer.WriteResponseMapper;
 @Slf4j
 public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
 
+  
   private static final WriteResponseMapper<RecordMetadata> WRITE_RESPONSE_WRAPPER =
       new WriteResponseMapper<RecordMetadata>() {
 
@@ -94,6 +102,7 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
 
   public Kafka09DataWriter(Producer producer, Config config) {
     this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
+    provisionTopic(topic,config);
     this.producer = producer;
   }
 
@@ -121,6 +130,37 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
   @Override
   public void flush()
       throws IOException {
-    this.producer.flush();
+	  this.producer.flush();
   }
+  
+  private void provisionTopic(String topicName,Config config) {
+    String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
+    if(!config.hasPath(zooKeeperPropKey)) {
+     log.debug("Topic "+topicName+" is configured without the partition and replication");
+     return;
+    }
+    String zookeeperConnect = config.getString(zooKeeperPropKey);
+    int sessionTimeoutMs = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT, KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+    int connectionTimeoutMs = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT, KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+    // createTopic() will only seem to work (it will return without error).  The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
+    // Security for Kafka was added in Kafka 0.9.0.0
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), false);
+    int partitions = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.PARTITION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    int replication = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.REPLICATION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    Properties topicConfig = new Properties(); 
+    if(AdminUtils.topicExists(zkUtils, topicName)) {
+	   log.debug("Topic"+topicName+" already Exists with replication: "+replication+" and partitions :"+partitions);
+       return;
+    } 
+    try {
+       AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig);
+    } catch (RuntimeException e) {
+       throw new RuntimeException(e);
+    }
+       log.info("Created Topic "+topicName+" with replication: "+replication+" and partitions :"+partitions);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
new file mode 100644
index 0000000..8cbe983
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+
+import org.apache.gobblin.test.TestUtils;
+
+public class KafkaClusterTestBase extends KafkaTestBase {
+
+	int clusterCount; 
+	EmbeddedZookeeper _zkServer;
+	String _zkConnectString;
+	ZkClient _zkClient;
+	List<KafkaServer> kafkaBrokerList = new ArrayList<KafkaServer>();
+	List<Integer> kafkaBrokerPortList = new ArrayList<Integer>();
+	
+	public KafkaClusterTestBase(int clusterCount) throws InterruptedException, RuntimeException {
+		super();
+		this.clusterCount = clusterCount;
+	}
+
+	public void startCluster() {
+		// Start Zookeeper.
+		_zkServer = new EmbeddedZookeeper();
+		_zkConnectString = "127.0.0.1:"+_zkServer.port();
+		_zkClient = new ZkClient(_zkConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+		// Start Kafka Cluster.
+	    for(int i=0;i<clusterCount;i++) {
+	    	KafkaServer _kafkaServer = createKafkaServer(i,_zkConnectString);
+	    	kafkaBrokerList.add(_kafkaServer);
+	    }
+	}
+
+	public void stopCluster() {
+		Iterator<KafkaServer> iter = kafkaBrokerList.iterator();
+		while(iter.hasNext()){
+			KafkaServer server = iter.next();
+			try {
+				server.shutdown(); 
+			} catch (RuntimeException e) {
+				// Simply Ignore.
+			}
+		}
+	}
+
+	public int getZookeeperPort() {
+		return _zkServer.port();
+	}
+
+	public List<KafkaServer> getBrokerList() {
+		return kafkaBrokerList;
+	}
+
+	public List<Integer> getKafkaBrokerPortList() {
+		return kafkaBrokerPortList;
+	}
+
+	
+	public int getClusterCount() {
+		return kafkaBrokerList.size();
+	}
+
+	private KafkaServer createKafkaServer(int brokerId,String _zkConnectString){
+		
+		int _brokerId = brokerId;
+		int _kafkaServerPort = TestUtils.findFreePort();	
+		Properties props = kafka.utils.TestUtils.createBrokerConfig(
+          _brokerId,
+          _zkConnectString,
+          kafka.utils.TestUtils.createBrokerConfig$default$3(),
+          kafka.utils.TestUtils.createBrokerConfig$default$4(),
+          _kafkaServerPort,
+          kafka.utils.TestUtils.createBrokerConfig$default$6(),
+          kafka.utils.TestUtils.createBrokerConfig$default$7(),
+          kafka.utils.TestUtils.createBrokerConfig$default$8(),
+          kafka.utils.TestUtils.createBrokerConfig$default$9(),
+          kafka.utils.TestUtils.createBrokerConfig$default$10(),
+          kafka.utils.TestUtils.createBrokerConfig$default$11(),
+          kafka.utils.TestUtils.createBrokerConfig$default$12(),
+          kafka.utils.TestUtils.createBrokerConfig$default$13(),
+          kafka.utils.TestUtils.createBrokerConfig$default$14()
+          );
+      KafkaConfig config = new KafkaConfig(props);
+      Time mock = new MockTime();
+      KafkaServer _kafkaServer = kafka.utils.TestUtils.createServer(config, mock);
+      kafkaBrokerPortList.add(_kafkaServerPort);
+      return _kafkaServer;
+	}
+
+	public String getBootServersList() {
+		String bootServerString = "";
+		Iterator<Integer> ports =  kafkaBrokerPortList.iterator();
+		while(ports.hasNext()){
+			Integer port = ports.next();
+			bootServerString = bootServerString+"localhost:"+port+",";
+		}
+		bootServerString = bootServerString.substring(0,bootServerString.length()-1);
+		return bootServerString;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
new file mode 100644
index 0000000..0c59030
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class ByPassWatcher implements Watcher {
+
+	@Override
+	public void process(WatchedEvent event) {
+		// TODO Auto-generated method stub
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
new file mode 100644
index 0000000..d8b7ba0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.json.JSONObject;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.kafka.KafkaClusterTestBase;
+import org.apache.commons.lang3.StringUtils;
+import kafka.admin.AdminUtils;
+import kafka.api.TopicMetadata;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+
+@Slf4j
+public class Kafka09TopicProvisionTest {
+
+
+  private final KafkaClusterTestBase _kafkaTestHelper;
+  private int testClusterCount = 5;
+  
+  public Kafka09TopicProvisionTest()
+      throws InterruptedException, RuntimeException {
+    _kafkaTestHelper = new KafkaClusterTestBase(testClusterCount);
+  }
+
+  @BeforeSuite
+  public void beforeSuite() {
+    log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+    _kafkaTestHelper.startCluster();
+  }
+
+  @AfterSuite
+  public void afterSuite()
+      throws IOException {
+	  _kafkaTestHelper.stopCluster();
+  }
+
+  @Test
+  public void testCluster()
+	throws IOException, InterruptedException, KeeperException {
+	  int clusterCount = _kafkaTestHelper.getClusterCount();
+	  Assert.assertEquals(clusterCount,testClusterCount);
+	  int zkPort = _kafkaTestHelper.getZookeeperPort();
+      String kafkaBrokerPortList = _kafkaTestHelper.getKafkaBrokerPortList().toString();
+      System.out.println("kafkaBrokerPortList : " + kafkaBrokerPortList);
+	  ZooKeeper zk = new ZooKeeper("localhost:"+zkPort, 10000, new ByPassWatcher());
+	  List<Integer> brokerPortList = new ArrayList<Integer>();
+      List<String> ids = zk.getChildren("/brokers/ids", false);
+      for (String id : ids) {
+          String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
+          JSONObject obj = new JSONObject(brokerInfo);
+          int brokerPort = obj.getInt("port");
+          System.out.println(brokerPort);
+          brokerPortList.add(brokerPort);
+      }
+      Assert.assertTrue(_kafkaTestHelper.getKafkaBrokerPortList().equals(brokerPortList));
+  }
+   
+  @Test
+  public void testTopicPartitionCreationCount()
+      throws IOException, InterruptedException {
+    String topic = "topicPartition4";
+    int clusterCount = _kafkaTestHelper.getClusterCount();
+    int partionCount = clusterCount/2;
+    int zkPort = _kafkaTestHelper.getZookeeperPort();
+    Properties props = new Properties();
+    
+    //	Setting Topic Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, String.valueOf(clusterCount));
+    props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT,  String.valueOf(partionCount));
+    props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, "localhost:"+zkPort);
+    System.out.println(_kafkaTestHelper.getBootServersList());
+    
+    // Setting Producer Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", _kafkaTestHelper.getBootServersList());    
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    
+    Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
+    String zookeeperConnect = "localhost:"+_kafkaTestHelper.getZookeeperPort();
+    int sessionTimeoutMs = 10 * 1000;
+    int connectionTimeoutMs = 8 * 1000;
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+    // createTopic() will only seem to work (it will return without error).  The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(
+        zookeeperConnect,
+        sessionTimeoutMs,
+        connectionTimeoutMs,
+        ZKStringSerializer$.MODULE$);
+    boolean isSecureKafkaCluster = false;
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
+    
+    TopicMetadata metaData =
+    		AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
+    Assert.assertEquals(metaData.partitionsMetadata().size(), partionCount);
+
+  }
+  
+  @Test
+  public void testLiveTopicPartitionCreationCount()
+      throws IOException, InterruptedException {
+	String liveClusterCount = System.getProperty("live.cluster.count");
+	String liveZookeeper = System.getProperty("live.zookeeper");
+	String liveBroker = System.getProperty("live.broker");
+	String topic = System.getProperty("live.newtopic");
+	String topicReplicationCount = System.getProperty("live.newtopic.replicationCount");
+	String topicPartitionCount = System.getProperty("live.newtopic.partitionCount");
+	if(StringUtils.isEmpty(liveClusterCount)){
+		Assert.assertTrue(true);
+		return;
+	}
+	if(StringUtils.isEmpty(topicPartitionCount)){
+		int clusterCount = Integer.parseInt(liveClusterCount);
+		clusterCount--;
+		int partionCount = clusterCount/2;
+		topicReplicationCount = String.valueOf(clusterCount);
+		topicPartitionCount = String.valueOf(partionCount);
+	}
+	
+    Properties props = new Properties();
+    //	Setting Topic Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount);
+    props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount );
+    props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper);
+    // Setting Producer Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", liveBroker);    
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    
+    Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props);
+    int sessionTimeoutMs = 10 * 1000;
+    int connectionTimeoutMs = 8 * 1000;
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+    // createTopic() will only seem to work (it will return without error).  The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(
+    	liveZookeeper,
+        sessionTimeoutMs,
+        connectionTimeoutMs,
+        ZKStringSerializer$.MODULE$);
+    boolean isSecureKafkaCluster = false;
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster);
+    
+    TopicMetadata metaData =
+    		AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
+    Assert.assertEquals(metaData.partitionsMetadata().size(), Integer.parseInt(topicPartitionCount));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index f6776c0..279812e 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -50,4 +50,15 @@ public class KafkaWriterConfigurationKeys {
   static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = "kafka.schemaRegistry.switchName";
   static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = "true";
 
+  public static final String KAFKA_TOPIC_CONFIG = "writer.kafka.";
+  static final String TOPIC_NAME = "topic";
+  public static final String CLUSTER_ZOOKEEPER = KAFKA_TOPIC_CONFIG + "zookeeper";
+  static final String REPLICATION_COUNT = KAFKA_TOPIC_CONFIG + "replicationCount";
+  static final int REPLICATION_COUNT_DEFAULT = 1;
+  static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount";
+  static final int PARTITION_COUNT_DEFAULT = 1;
+  public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER + ".sto";
+  static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds
+  public static final String ZOOKEEPER_CONNECTION_TIMEOUT = CLUSTER_ZOOKEEPER + ".cto";
+  static final int ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT = 8000; // 8 seconds
 }