You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/26 03:57:45 UTC

git commit: kafka-871; Rename ZkConfig properties; patched by Swapnil Ghike; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 ede85875a -> 2d7403174


kafka-871; Rename ZkConfig properties; patched by Swapnil Ghike; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d740317
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d740317
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d740317

Branch: refs/heads/0.8
Commit: 2d7403174f8fd6d35fce05585a7ee6edb8af7dd4
Parents: ede8587
Author: Swapnil Ghike <sr...@gmail.com>
Authored: Thu Apr 25 18:57:31 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Apr 25 18:57:31 2013 -0700

----------------------------------------------------------------------
 config/consumer.properties                         |    6 +-
 config/producer.properties                         |    2 +-
 config/server.properties                           |    6 +-
 .../main/java/kafka/etl/impl/DataGenerator.java    |    2 +-
 contrib/hadoop-producer/README.md                  |    2 +-
 .../kafka/bridge/hadoop/KafkaOutputFormat.java     |    4 +-
 core/src/main/scala/kafka/client/ClientUtils.scala |    4 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala     |    2 +-
 .../scala/kafka/consumer/ConsumerConnector.scala   |    4 +-
 .../scala/kafka/producer/ConsoleProducer.scala     |    2 +-
 .../scala/kafka/producer/KafkaLog4jAppender.scala  |    4 +-
 .../main/scala/kafka/producer/ProducerConfig.scala |    2 +-
 .../main/scala/kafka/tools/ReplayLogProducer.scala |    4 +-
 .../kafka/tools/VerifyConsumerRebalance.scala      |    2 +-
 core/src/main/scala/kafka/utils/Utils.scala        |    2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    8 ++--
 .../scala/other/kafka/TestEndToEndLatency.scala    |    4 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |    4 +-
 .../test/scala/unit/kafka/log/LogOffsetTest.scala  |    2 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   18 +++---
 .../scala/unit/kafka/producer/ProducerTest.scala   |   12 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   12 ++--
 .../src/main/java/kafka/examples/Consumer.java     |    6 +-
 .../src/main/java/kafka/examples/Producer.java     |    2 +-
 .../scala/kafka/perf/ConsumerPerformance.scala     |    2 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |    2 +-
 .../config/migration_producer.properties           |    2 +-
 .../config/server.properties                       |    2 +
 .../testcase_9001/testcase_9001_properties.json    |    6 +-
 .../testcase_9003/testcase_9003_properties.json    |    6 +-
 .../testcase_9004/testcase_9004_properties.json    |    6 +-
 .../testcase_9005/testcase_9005_properties.json    |    6 +-
 .../testcase_9006/testcase_9006_properties.json    |    6 +-
 .../config/mirror_consumer.properties              |    6 +-
 .../config/mirror_producer.properties              |    2 +-
 .../config/server.properties                       |    4 +-
 .../replication_testsuite/config/server.properties |    4 +-
 system_test/utils/kafka_system_test_utils.py       |   43 +++++++++------
 38 files changed, 111 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/config/consumer.properties
----------------------------------------------------------------------
diff --git a/config/consumer.properties b/config/consumer.properties
index 9dbd583..7343cbc 100644
--- a/config/consumer.properties
+++ b/config/consumer.properties
@@ -14,13 +14,13 @@
 # limitations under the License.
 # see kafka.consumer.ConsumerConfig for more details
 
-# zk connection string
+# Zookeeper connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=127.0.0.1:2181
+zookeeper.connect=127.0.0.1:2181
 
 # timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
 
 #consumer group id
 group.id=test-consumer-group

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index cc8f5f6..162b8a6 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -18,7 +18,7 @@
 
 # list of brokers used for bootstrapping
 # format: host1:port1,host2:port2 ...
-broker.list=localhost:9092
+metadata.broker.list=localhost:9092
 
 # name of the partitioner class for partitioning events; default partition spreads data randomly
 #partitioner.class=

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 04408dd..bc6a521 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -97,15 +97,15 @@ log.cleanup.interval.mins=1
 
 ############################# Zookeeper #############################
 
-# Zk connection string (see zk docs for details).
+# Zookeeper connection string (see zookeeper docs for details).
 # This is a comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
-zk.connect=localhost:2181
+zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
 
 # metrics reporter properties
 kafka.metrics.polling.interval.secs=5

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
index df17978..4b1d117 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -70,7 +70,7 @@ public class DataGenerator {
 		
 		System.out.println("server uri:" + _uri.toString());
         Properties producerProps = new Properties();
-        producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
+        producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
         producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
         producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
         producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/contrib/hadoop-producer/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md
index 1bd3721..547c1ef 100644
--- a/contrib/hadoop-producer/README.md
+++ b/contrib/hadoop-producer/README.md
@@ -87,7 +87,7 @@ compression codec, one would add the "kafka.output.compression.codec" parameter
 compression). 
 
 For easier debugging, the above values as well as the Kafka broker information
-(kafka.broker.list), the topic (kafka.output.topic), and the schema
+(kafka.metadata.broker.list), the topic (kafka.output.topic), and the schema
 (kafka.output.schema) are injected into the job's configuration. By default,
 the Hadoop producer uses Kafka's sync producer as asynchronous operation
 doesn't make sense in the batch Hadoop case.

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index aa1f944..32f096c 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -124,8 +124,8 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
       // URL: kafka://<kafka host>/<topic>
       // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
       String brokerList = uri.getAuthority();
-      props.setProperty("broker.list", brokerList);
-      job.set(KAFKA_CONFIG_PREFIX + ".broker.list", brokerList);
+      props.setProperty("metadata.broker.list", brokerList);
+      job.set(KAFKA_CONFIG_PREFIX + ".metadata.broker.list", brokerList);
 
       if (uri.getPath() == null || uri.getPath().length() <= 1)
         throw new KafkaException("no topic specified in kafka uri");

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index a3d88ea..7b3f09d 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -16,7 +16,7 @@ object ClientUtils extends Logging{
   /**
    * Used by the producer to send a metadata request since it has access to the ProducerConfig
    * @param topics The topics for which the metadata needs to be fetched
-   * @param brokers The brokers in the cluster as configured on the producer through broker.list
+   * @param brokers The brokers in the cluster as configured on the producer through metadata.broker.list
    * @param producerConfig The producer's config
    * @return topic metadata response
    */
@@ -60,7 +60,7 @@ object ClientUtils extends Logging{
    */
   def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = {
     val props = new Properties()
-    props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
+    props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(","))
     props.put("client.id", clientId)
     props.put("request.timeout.ms", timeoutMs.toString)
     val producerConfig = new ProducerConfig(props)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index d6c4a51..e2b0041 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -158,7 +158,7 @@ object ConsoleConsumer extends Logging {
     props.put("auto.commit.enable", "true")
     props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
     props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
-    props.put("zk.connect", options.valueOf(zkConnectOpt))
+    props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index d8c23f2..13c3f77 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -83,7 +83,7 @@ object Consumer extends Logging {
    *  Create a ConsumerConnector
    *
    *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
-   *                 connection string zk.connect.
+   *                 connection string zookeeper.connect.
    */
   def create(config: ConsumerConfig): ConsumerConnector = {
     val consumerConnect = new ZookeeperConsumerConnector(config)
@@ -94,7 +94,7 @@ object Consumer extends Logging {
    *  Create a ConsumerConnector
    *
    *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
-   *                 connection string zk.connect.
+   *                 connection string zookeeper.connect.
    */
   def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
     val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 9c2260b..5539bce 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -126,7 +126,7 @@ object ConsoleProducer {
     cmdLineProps.put("topic", topic)
 
     val props = new Properties()
-    props.put("broker.list", brokerList)
+    props.put("metadata.broker.list", brokerList)
     val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
     props.put("compression.codec", codec.toString)
     props.put("producer.type", if(sync) "sync" else "async")

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 3d22e6d..88ae784 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -64,9 +64,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
     // check for config parameter validity
     val props = new Properties()
     if(brokerList != null)
-      props.put("broker.list", brokerList)
+      props.put("metadata.broker.list", brokerList)
     if(props.isEmpty)
-      throw new MissingConfigException("The broker.list property should be specified")
+      throw new MissingConfigException("The metadata.broker.list property should be specified")
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
     if(serializerClass == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index e27ec44..7947b18 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -63,7 +63,7 @@ class ProducerConfig private (val props: VerifiableProperties)
    * format is host1:port1,host2:port2, and the list can be a subset of brokers or
    * a VIP pointing to a subset of brokers.
    */
-  val brokerList = props.getString("broker.list")
+  val brokerList = props.getString("metadata.broker.list")
 
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner")

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index d744a78..814d61a 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -43,7 +43,7 @@ object ReplayLogProducer extends Logging {
     // consumer properties
     val consumerProps = new Properties
     consumerProps.put("group.id", GroupId)
-    consumerProps.put("zk.connect", config.zkConnect)
+    consumerProps.put("zookeeper.connect", config.zkConnect)
     consumerProps.put("consumer.timeout.ms", "10000")
     consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString)
     consumerProps.put("fetch.message.max.bytes", (1024*1024).toString)
@@ -139,7 +139,7 @@ object ReplayLogProducer extends Logging {
   class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
     val props = new Properties()
-    props.put("broker.list", config.brokerList)
+    props.put("metadata.broker.list", config.brokerList)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("send.buffer.bytes", (64*1024).toString)
     props.put("compression.codec", config.compressionCodec.codec.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index d9c8bae..dc6d066 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -25,7 +25,7 @@ object VerifyConsumerRebalance extends Logging {
   def main(args: Array[String]) {
     val parser = new OptionParser()
 
-    val zkConnectOpt = parser.accepts("zk.connect", "ZooKeeper connect string.").
+    val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string.").
       withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]);
     val groupOpt = parser.accepts("group", "Consumer group.").
       withRequiredArg().ofType(classOf[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index c639efb..e83eb5f 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -507,7 +507,7 @@ object Utils extends Logging {
     val builder = new StringBuilder
     builder.append("[ ")
     if (valueInQuotes)
-      builder.append(jsonData.map("\"" + _ + "\"")).mkString(", ")
+      builder.append(jsonData.map("\"" + _ + "\"").mkString(", "))
     else
       builder.append(jsonData.mkString(", "))
     builder.append(" ]")

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index bd93ff1..7971a09 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -772,14 +772,14 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group)
 
 class ZKConfig(props: VerifiableProperties) {
   /** ZK host string */
-  val zkConnect = props.getString("zk.connect", null)
+  val zkConnect = props.getString("zookeeper.connect", null)
 
   /** zookeeper session timeout */
-  val zkSessionTimeoutMs = props.getInt("zk.session.timeout.ms", 6000)
+  val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)
 
   /** the max time that the client waits to establish a connection to zookeeper */
-  val zkConnectionTimeoutMs = props.getInt("zk.connection.timeout.ms",zkSessionTimeoutMs)
+  val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs)
 
   /** how far a ZK follower can be behind a ZK leader */
-  val zkSyncTimeMs = props.getInt("zk.sync.time.ms", 2000)
+  val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
index 98c12b7..c4aed10 100644
--- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
+++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
@@ -38,7 +38,7 @@ object TestEndToEndLatency {
     consumerProps.put("group.id", topic)
     consumerProps.put("auto.commit", "true")
     consumerProps.put("auto.offset.reset", "largest")
-    consumerProps.put("zk.connect", zkConnect)
+    consumerProps.put("zookeeper.connect", zkConnect)
     consumerProps.put("socket.timeout.ms", 1201000.toString)
     
     val config = new ConsumerConfig(consumerProps)
@@ -47,7 +47,7 @@ object TestEndToEndLatency {
     val iter = stream.iterator
 
     val producerProps = new Properties()
-    producerProps.put("broker.list", brokerList)
+    producerProps.put("metadata.broker.list", brokerList)
     producerProps.put("producer.type", "sync")
     val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index f7ee914..86d30ad 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -330,7 +330,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                                     compression: CompressionCodec = NoCompressionCodec): List[String] = {
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     props.put("compression.codec", compression.codec.toString)
     props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
@@ -350,7 +350,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                    numParts: Int): List[String]= {
     var messages: List[String] = Nil
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
     props.put("serializer.class", classOf[StringEncoder].getName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index b343d98..1a9cc01 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -207,7 +207,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     props.put("log.retention.hours", "10")
     props.put("log.cleanup.interval.mins", "5")
     props.put("log.segment.bytes", logSize.toString)
-    props.put("zk.connect", zkConnect.toString)
+    props.put("zookeeper.connect", zkConnect.toString)
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 922a200..7f7a8d7 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -61,7 +61,7 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("producer.type", "async")
     props.put("queue.buffering.max.messages", "10")
     props.put("batch.num.messages", "1")
@@ -86,7 +86,7 @@ class AsyncProducerTest extends JUnit3Suite {
   def testProduceAfterClosed() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("producer.type", "async")
     props.put("batch.num.messages", "1")
 
@@ -165,7 +165,7 @@ class AsyncProducerTest extends JUnit3Suite {
     producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
 
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val broker1 = new Broker(0, "localhost", 9092)
     val broker2 = new Broker(1, "localhost", 9093)
     broker1
@@ -215,7 +215,7 @@ class AsyncProducerTest extends JUnit3Suite {
   def testSerializeEvents() {
     val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m))
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
     // form expected partitions metadata
     val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
@@ -241,7 +241,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
     producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
 
     // form expected partitions metadata
@@ -270,7 +270,7 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testNoBroker() {
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 
     val config = new ProducerConfig(props)
     // create topic metadata with 0 partitions
@@ -301,7 +301,7 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testIncompatibleEncoder() {
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
 
     val producer=new Producer[String, String](config)
@@ -318,7 +318,7 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testRandomPartitioner() {
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
 
     // create topic metadata with 0 partitions
@@ -392,7 +392,7 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testFailedSendRetryLogic() {
     val props = new Properties()
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("request.required.acks", "1")
     props.put("serializer.class", classOf[StringEncoder].getName.toString)
     props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 04acef5..bc37531 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -93,7 +93,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val props1 = new util.Properties()
-    props1.put("broker.list", "localhost:80,localhost:81")
+    props1.put("metadata.broker.list", "localhost:80,localhost:81")
     props1.put("serializer.class", "kafka.serializer.StringEncoder")
     val producerConfig1 = new ProducerConfig(props1)
     val producer1 = new Producer[String, String](producerConfig1)
@@ -108,7 +108,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     }
 
     val props2 = new util.Properties()
-    props2.put("broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1)))
+    props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1)))
     props2.put("serializer.class", "kafka.serializer.StringEncoder")
     val producerConfig2= new ProducerConfig(props2)
     val producer2 = new Producer[String, String](producerConfig2)
@@ -121,7 +121,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     }
 
     val props3 = new util.Properties()
-    props3.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props3.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     props3.put("serializer.class", "kafka.serializer.StringEncoder")
     val producerConfig3 = new ProducerConfig(props3)
     val producer3 = new Producer[String, String](producerConfig3)
@@ -139,7 +139,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     val props1 = new util.Properties()
     props1.put("serializer.class", "kafka.serializer.StringEncoder")
     props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props1.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props1.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     props1.put("request.required.acks", "2")
     props1.put("request.timeout.ms", "1000")
 
@@ -200,7 +200,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("request.timeout.ms", "2000")
     props.put("request.required.acks", "1")
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
@@ -257,7 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
-    props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     props.put("request.required.acks", "1")
 
     val config = new ProducerConfig(props)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 217ff7a..68c134e 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -128,7 +128,7 @@ object TestUtils extends Logging {
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval.messages", "1")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
     props.put("replica.socket.timeout.ms", "1500")
     props
   }
@@ -139,12 +139,12 @@ object TestUtils extends Logging {
   def createConsumerProperties(zkConnect: String, groupId: String, consumerId: String,
                                consumerTimeout: Long = -1): Properties = {
     val props = new Properties
-    props.put("zk.connect", zkConnect)
+    props.put("zookeeper.connect", zkConnect)
     props.put("group.id", groupId)
     props.put("consumer.id", consumerId)
     props.put("consumer.timeout.ms", consumerTimeout.toString)
-    props.put("zk.session.timeout.ms", "400")
-    props.put("zk.sync.time.ms", "200")
+    props.put("zookeeper.session.timeout.ms", "400")
+    props.put("zookeeper.sync.time.ms", "200")
     props.put("auto.commit.interval.ms", "1000")
     props.put("rebalance.max.retries", "4")
 
@@ -292,7 +292,7 @@ object TestUtils extends Logging {
                            encoder: Encoder[V] = new DefaultEncoder(), 
                            keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = {
     val props = new Properties()
-    props.put("broker.list", brokerList)
+    props.put("metadata.broker.list", brokerList)
     props.put("send.buffer.bytes", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
@@ -303,7 +303,7 @@ object TestUtils extends Logging {
 
   def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = {
     val props = new Properties()
-    props.put("broker.list", brokerList)
+    props.put("metadata.broker.list", brokerList)
     props.put("partitioner.class", partitioner)
     props.put("message.send.max.retries", "3")
     props.put("retry.backoff.ms", "1000")

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/examples/src/main/java/kafka/examples/Consumer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index 3460d36..63f099a 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -43,10 +43,10 @@ public class Consumer extends Thread
   private static ConsumerConfig createConsumerConfig()
   {
     Properties props = new Properties();
-    props.put("zk.connect", KafkaProperties.zkConnect);
+    props.put("zookeeper.connect", KafkaProperties.zkConnect);
     props.put("group.id", KafkaProperties.groupId);
-    props.put("zk.session.timeout.ms", "400");
-    props.put("zk.sync.time.ms", "200");
+    props.put("zookeeper.session.timeout.ms", "400");
+    props.put("zookeeper.sync.time.ms", "200");
     props.put("auto.commit.interval.ms", "1000");
 
     return new ConsumerConfig(props);

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/examples/src/main/java/kafka/examples/Producer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java
index a770a18..96e9893 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -30,7 +30,7 @@ public class Producer extends Thread
   public Producer(String topic)
   {
     props.put("serializer.class", "kafka.serializer.StringEncoder");
-    props.put("broker.list", "localhost:9092");
+    props.put("metadata.broker.list", "localhost:9092");
     // Use random partitioner. Don't need the key type. Just set it to Integer.
     // The message is of type String.
     producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index ee2ce95..3158a22 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -128,7 +128,7 @@ object ConsumerPerformance {
     props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
     props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
     props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
-    props.put("zk.connect", options.valueOf(zkConnectOpt))
+    props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", "5000")
     val consumerConfig = new ConsumerConfig(props)
     val numThreads = options.valueOf(numThreadsOpt).intValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 851a99e..ad2ac26 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -174,7 +174,7 @@ object ProducerPerformance extends Logging {
                        val allDone: CountDownLatch,
                        val rand: Random) extends Runnable {
     val props = new Properties()
-    props.put("broker.list", config.brokerList)
+    props.put("metadata.broker.list", config.brokerList)
     props.put("compression.codec", config.compressionCodec.codec.toString)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("send.buffer.bytes", (64*1024).toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/config/migration_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/config/migration_producer.properties b/system_test/migration_tool_testsuite/config/migration_producer.properties
index af080ae..1750807 100644
--- a/system_test/migration_tool_testsuite/config/migration_producer.properties
+++ b/system_test/migration_tool_testsuite/config/migration_producer.properties
@@ -20,7 +20,7 @@
 
 # configure brokers statically
 # format: host1:port1,host2:port2 ...
-broker.list=localhost:9094,localhost:9095,localhost:9096
+metadata.broker.list=localhost:9094,localhost:9095,localhost:9096
 
 # discover brokers from ZK
 #zk.connect=

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/config/server.properties b/system_test/migration_tool_testsuite/config/server.properties
index d231d4c..6ecbb71 100644
--- a/system_test/migration_tool_testsuite/config/server.properties
+++ b/system_test/migration_tool_testsuite/config/server.properties
@@ -115,8 +115,10 @@ enable.zookeeper=true
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
 zk.connect=localhost:2181
+zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zk.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
 
 monitoring.period.secs=1

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json b/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json
index cf84caa..1904ab5 100644
--- a/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json
+++ b/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json
@@ -29,7 +29,7 @@
       "entity_id": "1",
       "port": "9091",
       "brokerid": "1",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_1_logs",
       "log_filename": "kafka_server_1.log",
@@ -39,7 +39,7 @@
       "entity_id": "2",
       "port": "9092",
       "brokerid": "2",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_2_logs",
       "log_filename": "kafka_server_2.log",
@@ -49,7 +49,7 @@
       "entity_id": "3",
       "port": "9093",
       "brokerid": "3",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_3_logs",
       "log_filename": "kafka_server_3.log",

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json b/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json
index 0b413c4..8cacc69 100644
--- a/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json
+++ b/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json
@@ -30,7 +30,7 @@
       "entity_id": "1",
       "port": "9091",
       "brokerid": "1",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_1_logs",
       "log_filename": "kafka_server_1.log",
@@ -40,7 +40,7 @@
       "entity_id": "2",
       "port": "9092",
       "brokerid": "2",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_2_logs",
       "log_filename": "kafka_server_2.log",
@@ -50,7 +50,7 @@
       "entity_id": "3",
       "port": "9093",
       "brokerid": "3",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_3_logs",
       "log_filename": "kafka_server_3.log",

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json b/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json
index 5c6baaf..4dbd80b 100644
--- a/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json
+++ b/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json
@@ -30,7 +30,7 @@
       "entity_id": "1",
       "port": "9091",
       "brokerid": "1",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_1_logs",
       "log_filename": "kafka_server_1.log",
@@ -40,7 +40,7 @@
       "entity_id": "2",
       "port": "9092",
       "brokerid": "2",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_2_logs",
       "log_filename": "kafka_server_2.log",
@@ -50,7 +50,7 @@
       "entity_id": "3",
       "port": "9093",
       "brokerid": "3",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_3_logs",
       "log_filename": "kafka_server_3.log",

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json b/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json
index 8597e1a..e46b453 100644
--- a/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json
+++ b/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json
@@ -39,7 +39,7 @@
       "entity_id": "2",
       "port": "9091",
       "brokerid": "1",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_2_logs",
       "log_filename": "kafka_server_2.log",
@@ -49,7 +49,7 @@
       "entity_id": "3",
       "port": "9092",
       "brokerid": "2",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_3_logs",
       "log_filename": "kafka_server_3.log",
@@ -59,7 +59,7 @@
       "entity_id": "4",
       "port": "9093",
       "brokerid": "3",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_4_logs",
       "log_filename": "kafka_server_4.log",

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json b/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json
index ec3290f..10f5955 100644
--- a/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json
+++ b/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json
@@ -39,7 +39,7 @@
       "entity_id": "2",
       "port": "9091",
       "brokerid": "1",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_2_logs",
       "log_filename": "kafka_server_2.log",
@@ -49,7 +49,7 @@
       "entity_id": "3",
       "port": "9092",
       "brokerid": "2",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_3_logs",
       "log_filename": "kafka_server_3.log",
@@ -59,7 +59,7 @@
       "entity_id": "4",
       "port": "9093",
       "brokerid": "3",
-      "07_client": "true",
+      "version": "0.7",
       "log.file.size": "51200",
       "log.dir": "/tmp/kafka_server_4_logs",
       "log_filename": "kafka_server_4.log",

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/mirror_maker_testsuite/config/mirror_consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties
index bb1a1cc..e90634a 100644
--- a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties
+++ b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties
@@ -1,5 +1,5 @@
-zk.connect=localhost:2108
-zk.connection.timeout.ms=1000000
+zookeeper.connect=localhost:2108
+zookeeper.connection.timeout.ms=1000000
 group.id=mm_regtest_grp
 auto.commit.interval.ms=120000
 auto.offset.reset=smallest
@@ -8,5 +8,5 @@ auto.offset.reset=smallest
 #rebalance.backoff.ms=2000
 socket.receive.buffer.bytes=1048576
 fetch.message.max.bytes=1048576
-zk.sync.time.ms=15000
+zookeeper.sync.time.ms=15000
 shallow.iterator.enable=false

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/mirror_maker_testsuite/config/mirror_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties
index bdb657f..b2bf2c2 100644
--- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties
+++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties
@@ -1,6 +1,6 @@
 producer.type=async
 queue.enqueue.timeout.ms=-1
-broker.list=localhost:9094
+metadata.broker.list=localhost:9094
 compression.codec=0
 message.send.max.retries=3
 request.required.acks=1

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/mirror_maker_testsuite/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/config/server.properties b/system_test/mirror_maker_testsuite/config/server.properties
index dacf158..36dd68d 100644
--- a/system_test/mirror_maker_testsuite/config/server.properties
+++ b/system_test/mirror_maker_testsuite/config/server.properties
@@ -114,10 +114,10 @@ enable.zookeeper=true
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
-zk.connect=localhost:2181
+zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
 
 monitoring.period.secs=1
 message.max.bytes=1000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/replication_testsuite/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties
index dacf158..36dd68d 100644
--- a/system_test/replication_testsuite/config/server.properties
+++ b/system_test/replication_testsuite/config/server.properties
@@ -114,10 +114,10 @@ enable.zookeeper=true
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
-zk.connect=localhost:2181
+zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
 
 monitoring.period.secs=1
 message.max.bytes=1000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index dd082f5..ae393bc 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -313,8 +313,8 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv
     logger.info("testcase config (dest)   pathname : " + cfgDestPathname, extra=d)
 
     # loop through all zookeepers (if more than 1) to retrieve host and clientPort
-    # to construct a zk.connect str for broker in the form of:
-    # zk.connect=<host1>:<port1>,<host2>:<port2>,...
+    # to construct a zookeeper.connect str for broker in the form of:
+    # zookeeper.connect=<host1>:<port1>,<host2>:<port2>,...
     testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]        = ""
     testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]        = ""
     testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"]      = []
@@ -409,28 +409,35 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv
 
                 # copy the associated .properties template, update values, write to testcase_<xxx>/config
 
-                if ( clusterCfg["role"] == "broker" ):
-                    if clusterCfg["cluster_name"] == "source":
-                        tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
-                    elif clusterCfg["cluster_name"] == "target":
-                        tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
-                    else:
-                        logger.error("Unknown cluster name: " + clusterName, extra=d)
-                        sys.exit(1)
-
-                    zeroSevenClient = "false"
+                if (clusterCfg["role"] == "broker"):
+                    brokerVersion = "0.8"
                     try:
-                        zeroSevenClient = tcCfg["07_client"]
+                        brokerVersion = tcCfg["version"]
                     except:
                         pass
 
+                    if (brokerVersion == "0.7"):
+                        if clusterCfg["cluster_name"] == "source":
+                            tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+                        else:
+                            logger.error("Unknown cluster name for 0.7: " + clusterName, extra=d)
+                            sys.exit(1)
+                    else:
+                        if clusterCfg["cluster_name"] == "source":
+                            tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+                        elif clusterCfg["cluster_name"] == "target":
+                            tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
+                        else:
+                            logger.error("Unknown cluster name: " + clusterName, extra=d)
+                            sys.exit(1)
+
                     addedCSVConfig = {}
                     addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") 
                     addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
                     addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
                     addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true"
 
-                    if zeroSevenClient == "true":
+                    if brokerVersion == "0.7":
                         addedCSVConfig["brokerid"] = tcCfg["brokerid"]
 
                     copy_file_with_dict_values(cfgTemplatePathname + "/server.properties",
@@ -450,12 +457,12 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv
                         sys.exit(1)
 
                 elif ( clusterCfg["role"] == "mirror_maker"):
-                    tcCfg["broker.list"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
+                    tcCfg["metadata.broker.list"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
                     copy_file_with_dict_values(cfgTemplatePathname + "/mirror_producer.properties",
                         cfgDestPathname + "/" + tcCfg["mirror_producer_config_filename"], tcCfg, None)
 
-                    # update zk.connect with the zk entities specified in cluster_config.json
-                    tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+                    # update zookeeper.connect with the zk entities specified in cluster_config.json
+                    tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
                     copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties",
                         cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None)
                 
@@ -818,7 +825,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
         if len(formatterOption) > 0:
             formatterOption = " --formatter " + formatterOption + " "
 
-        # get zk.connect
+        # get zookeeper connect string
         zkConnectStr = ""
         if clusterName == "source":
             zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]