You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2015/12/22 07:58:49 UTC

kafka git commit: KAFKA-3020; Ensure CheckStyle runs on all Java code

Repository: kafka
Updated Branches:
  refs/heads/trunk a0d21407c -> 64b746bd8


KAFKA-3020; Ensure CheckStyle runs on all Java code

- Adds CheckStyle to core and examples modules
- Fixes any existing CheckStyle issues

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #703 from granthenke/checkstyle-core


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64b746bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64b746bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64b746bd

Branch: refs/heads/trunk
Commit: 64b746bd8b4dae17d7dd804f0e7161f304e2d8ee
Parents: a0d2140
Author: Grant Henke <gr...@gmail.com>
Authored: Mon Dec 21 22:48:03 2015 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Dec 21 22:48:03 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  15 +-
 checkstyle/import-control-core.xml              |  69 ++
 .../javaapi/consumer/ConsumerConnector.java     | 112 +--
 .../scala/kafka/tools/KafkaMigrationTool.java   | 797 ++++++++++---------
 .../src/main/java/kafka/examples/Consumer.java  |  74 +-
 .../examples/KafkaConsumerProducerDemo.java     |  22 +-
 .../java/kafka/examples/KafkaProperties.java    |  31 +-
 .../src/main/java/kafka/examples/Producer.java  | 138 ++--
 .../java/kafka/examples/SimpleConsumerDemo.java | 110 +--
 9 files changed, 720 insertions(+), 648 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 224f550..b25aa95 100644
--- a/build.gradle
+++ b/build.gradle
@@ -259,6 +259,7 @@ project(':core') {
   println "Building project 'core' with Scala version $resolvedScalaVersion"
 
   apply plugin: 'scala'
+  apply plugin: 'checkstyle'
   archivesBaseName = "kafka_${baseScalaVersion}"
 
   dependencies {
@@ -334,8 +335,6 @@ project(':core') {
     into 'site-docs'
   }
 
-
-
   tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
     into "kafka_${baseScalaVersion}-${version}"
     compression = Compression.GZIP
@@ -390,15 +389,27 @@ project(':core') {
     }
     into "$buildDir/dependant-testlibs"
   }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
 project(':examples') {
+  apply plugin: 'checkstyle'
   archivesBaseName = "kafka-examples"
 
   dependencies {
     compile project(':core')
   }
 
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
 project(':clients') {

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/checkstyle/import-control-core.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
new file mode 100644
index 0000000..d53e9e8
--- /dev/null
+++ b/checkstyle/import-control-core.xml
@@ -0,0 +1,69 @@
+<!DOCTYPE import-control PUBLIC
+"-//Puppy Crawl//DTD Import Control 1.1//EN"
+"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+
+<import-control pkg="kafka">
+
+  <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+  <!-- common library dependencies -->
+  <allow pkg="java" />
+  <allow pkg="scala" />
+  <allow pkg="javax.management" />
+  <allow pkg="org.slf4j" />
+  <allow pkg="org.junit" />
+  <allow pkg="org.easymock" />
+  <allow pkg="java.security" />
+  <allow pkg="javax.net.ssl" />
+  <allow pkg="javax.security" />
+
+  <allow pkg="kafka.common" />
+  <allow pkg="kafka.utils" />
+  <allow pkg="kafka.serializer" />
+  <allow pkg="org.apache.kafka.common" />
+
+  <subpackage name="javaapi">
+    <subpackage name="consumer">
+      <allow pkg="kafka.consumer" />
+    </subpackage>
+
+    <subpackage name="message">
+      <allow pkg="kafka.message" />
+    </subpackage>
+
+    <subpackage name="producer">
+      <allow pkg="kafka.producer" />
+    </subpackage>
+  </subpackage>
+
+  <subpackage name="tools">
+    <allow pkg="kafka.javaapi" />
+    <allow pkg="kafka.producer" />
+    <allow pkg="kafka.consumer" />
+    <allow pkg="joptsimple" />
+  </subpackage>
+
+  <subpackage name="examples">
+    <allow pkg="org.apache.kafka.clients" />
+    <allow pkg="kafka.api" />
+    <allow pkg="kafka.javaapi" />
+    <allow pkg="kafka.message" />
+  </subpackage>
+
+</import-control>

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index 444cd1d..4de2a4c 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -17,72 +17,72 @@
 
 package kafka.javaapi.consumer;
 
-
-import java.util.List;
-import java.util.Map;
-
 import kafka.common.OffsetAndMetadata;
 import kafka.common.TopicAndPartition;
 import kafka.consumer.KafkaStream;
 import kafka.consumer.TopicFilter;
 import kafka.serializer.Decoder;
 
+import java.util.List;
+import java.util.Map;
+
 public interface ConsumerConnector {
-  /**
-   *  Create a list of MessageStreams of type T for each topic.
-   *
-   *  @param topicCountMap  a map of (topic, #streams) pair
-   *  @param keyDecoder a decoder that decodes the message key
-   *  @param valueDecoder a decoder that decodes the message itself
-   *  @return a map of (topic, list of  KafkaStream) pairs.
-   *          The number of items in the list is #streams. Each stream supports
-   *          an iterator over message/metadata pairs.
-   */
-  public <K,V> Map<String, List<KafkaStream<K,V>>> 
-    createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
-  
-  public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
+    /**
+     *  Create a list of MessageStreams of type T for each topic.
+     *
+     *  @param topicCountMap  a map of (topic, #streams) pair
+     *  @param keyDecoder a decoder that decodes the message key
+     *  @param valueDecoder a decoder that decodes the message itself
+     *  @return a map of (topic, list of  KafkaStream) pairs.
+     *          The number of items in the list is #streams. Each stream supports
+     *          an iterator over message/metadata pairs.
+     */
+    public <K, V> Map<String, List<KafkaStream<K, V>>>
+        createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
+
+    public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
+
+    /**
+     *  Create a list of MessageAndTopicStreams containing messages of type T.
+     *
+     *  @param topicFilter a TopicFilter that specifies which topics to
+     *                    subscribe to (encapsulates a whitelist or a blacklist).
+     *  @param numStreams the number of message streams to return.
+     *  @param keyDecoder a decoder that decodes the message key
+     *  @param valueDecoder a decoder that decodes the message itself
+     *  @return a list of KafkaStream. Each stream supports an
+     *          iterator over its MessageAndMetadata elements.
+     */
+    public <K, V> List<KafkaStream<K, V>>
+        createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
+
+    public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
+
+    public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
 
-  /**
-   *  Create a list of MessageAndTopicStreams containing messages of type T.
-   *
-   *  @param topicFilter a TopicFilter that specifies which topics to
-   *                    subscribe to (encapsulates a whitelist or a blacklist).
-   *  @param numStreams the number of message streams to return.
-   *  @param keyDecoder a decoder that decodes the message key
-   *  @param valueDecoder a decoder that decodes the message itself
-   *  @return a list of KafkaStream. Each stream supports an
-   *          iterator over its MessageAndMetadata elements.
-   */
-  public <K,V> List<KafkaStream<K,V>> 
-    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
-  
-  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
-  
-  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
+    /**
+     *  Commit the offsets of all broker partitions connected by this connector.
+     */
+    public void commitOffsets();
 
-  /**
-   *  Commit the offsets of all broker partitions connected by this connector.
-   */
-  public void commitOffsets();
-  public void commitOffsets(boolean retryOnFailure);
+    public void commitOffsets(boolean retryOnFailure);
 
-  /**
-   *  Commit offsets using the provided offsets map
-   *
-   *  @param offsetsToCommit a map containing the offset to commit for each partition.
-   *  @param retryOnFailure enable retries on the offset commit if it fails.
-   */
-  public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
+    /**
+     *  Commit offsets using the provided offsets map
+     *
+     *  @param offsetsToCommit a map containing the offset to commit for each partition.
+     *  @param retryOnFailure enable retries on the offset commit if it fails.
+     */
+    public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
 
-  /**
-   * Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
-   * @param listener The consumer rebalance listener to wire in
-   */
-  public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
+    /**
+     * Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
+     * @param listener The consumer rebalance listener to wire in
+     */
+    public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
 
-  /**
-   *  Shut down the connector
-   */
-  public void shutdown();
+    /**
+     *  Shut down the connector
+     */
+    public void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index f19df0c..b1ab649 100755
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -17,10 +17,15 @@
 
 package kafka.tools;
 
-import joptsimple.*;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -38,8 +43,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.common.utils.Utils;
-
 
 /**
  * This is a  kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
@@ -58,429 +61,427 @@ import org.apache.kafka.common.utils.Utils;
  * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
  */
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class KafkaMigrationTool
-{
-  private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
-  private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
-  private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
-  private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
-  private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
-  private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
-  private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
-  private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
-  private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
-  private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
-  private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
-
-  private static Class<?> KafkaStaticConsumer_07 = null;
-  private static Class<?> ConsumerConfig_07 = null;
-  private static Class<?> ConsumerConnector_07 = null;
-  private static Class<?> KafkaStream_07 = null;
-  private static Class<?> TopicFilter_07 = null;
-  private static Class<?> WhiteList_07 = null;
-  private static Class<?> BlackList_07 = null;
-  private static Class<?> KafkaConsumerIteratorClass_07 = null;
-  private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
-  private static Class<?> KafkaMessageClass_07 = null;
-
-  public static void main(String[] args) throws InterruptedException, IOException {
-    OptionParser parser = new OptionParser();
-    ArgumentAcceptingOptionSpec<String> consumerConfigOpt
-      = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
-      .withRequiredArg()
-      .describedAs("config file")
-      .ofType(String.class);
-
-    ArgumentAcceptingOptionSpec<String> producerConfigOpt
-      =  parser.accepts("producer.config", "Producer config.")
-      .withRequiredArg()
-      .describedAs("config file")
-      .ofType(String.class);
-
-    ArgumentAcceptingOptionSpec<Integer> numProducersOpt
-      =  parser.accepts("num.producers", "Number of producer instances")
-      .withRequiredArg()
-      .describedAs("Number of producers")
-      .ofType(Integer.class)
-      .defaultsTo(1);
-
-    ArgumentAcceptingOptionSpec<String> zkClient01JarOpt
-      = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
-      .withRequiredArg()
-      .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
-      .ofType(String.class);
-
-    ArgumentAcceptingOptionSpec<String> kafka07JarOpt
-      = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
-      .withRequiredArg()
-      .describedAs("kafka 0.7 jar")
-      .ofType(String.class);
-
-    ArgumentAcceptingOptionSpec<Integer> numStreamsOpt
-      = parser.accepts("num.streams", "Number of consumer streams")
-      .withRequiredArg()
-      .describedAs("Number of consumer threads")
-      .ofType(Integer.class)
-      .defaultsTo(1);
-
-    ArgumentAcceptingOptionSpec<String> whitelistOpt
-      = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
-      .withRequiredArg()
-      .describedAs("Java regex (String)")
-      .ofType(String.class);
-
-    ArgumentAcceptingOptionSpec<String> blacklistOpt
-      = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
-      .withRequiredArg()
-      .describedAs("Java regex (String)")
-      .ofType(String.class);
-
-    ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
-      =  parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
-      .withRequiredArg()
-      .describedAs("Queue size in terms of number of messages")
-      .ofType(Integer.class)
-      .defaultsTo(10000);
-
-    OptionSpecBuilder helpOpt
-      = parser.accepts("help", "Print this message.");
-
-    OptionSet options = parser.parse(args);
-
-    if (options.has(helpOpt)) {
-      parser.printHelpOn(System.out);
-      System.exit(0);
-    }
+public class KafkaMigrationTool {
+    private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
+    private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
+    private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
+    private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
+    private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
+    private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
+    private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
+    private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
+    private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
+    private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
+    private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
+
+    private static Class<?> kafkaStaticConsumer07 = null;
+    private static Class<?> consumerConfig07 = null;
+    private static Class<?> consumerConnector07 = null;
+    private static Class<?> kafkaStream07 = null;
+    private static Class<?> topicFilter07 = null;
+    private static Class<?> whiteList07 = null;
+    private static Class<?> blackList07 = null;
+    private static Class<?> kafkaConsumerIteratorClass07 = null;
+    private static Class<?> kafkaMessageAndMetaDataClass07 = null;
+    private static Class<?> kafkaMessageClass07 = null;
+
+    public static void main(String[] args) throws InterruptedException, IOException {
+        OptionParser parser = new OptionParser();
+        ArgumentAcceptingOptionSpec<String> consumerConfigOpt
+            = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
+            .withRequiredArg()
+            .describedAs("config file")
+            .ofType(String.class);
+
+        ArgumentAcceptingOptionSpec<String> producerConfigOpt
+            = parser.accepts("producer.config", "Producer config.")
+            .withRequiredArg()
+            .describedAs("config file")
+            .ofType(String.class);
+
+        ArgumentAcceptingOptionSpec<Integer> numProducersOpt
+            = parser.accepts("num.producers", "Number of producer instances")
+            .withRequiredArg()
+            .describedAs("Number of producers")
+            .ofType(Integer.class)
+            .defaultsTo(1);
+
+        ArgumentAcceptingOptionSpec<String> zkClient01JarOpt
+            = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
+            .withRequiredArg()
+            .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
+            .ofType(String.class);
+
+        ArgumentAcceptingOptionSpec<String> kafka07JarOpt
+            = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
+            .withRequiredArg()
+            .describedAs("kafka 0.7 jar")
+            .ofType(String.class);
+
+        ArgumentAcceptingOptionSpec<Integer> numStreamsOpt
+            = parser.accepts("num.streams", "Number of consumer streams")
+            .withRequiredArg()
+            .describedAs("Number of consumer threads")
+            .ofType(Integer.class)
+            .defaultsTo(1);
+
+        ArgumentAcceptingOptionSpec<String> whitelistOpt
+            = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
+            .withRequiredArg()
+            .describedAs("Java regex (String)")
+            .ofType(String.class);
+
+        ArgumentAcceptingOptionSpec<String> blacklistOpt
+            = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
+            .withRequiredArg()
+            .describedAs("Java regex (String)")
+            .ofType(String.class);
+
+        ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
+            = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
+            .withRequiredArg()
+            .describedAs("Queue size in terms of number of messages")
+            .ofType(Integer.class)
+            .defaultsTo(10000);
+
+        OptionSpecBuilder helpOpt
+            = parser.accepts("help", "Print this message.");
+
+        OptionSet options = parser.parse(args);
+
+        if (options.has(helpOpt)) {
+            parser.printHelpOn(System.out);
+            System.exit(0);
+        }
 
-    checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
-    int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
-    int blackListCount = options.has(blacklistOpt) ? 1 : 0;
-    if(whiteListCount + blackListCount != 1) {
-      System.err.println("Exactly one of whitelist or blacklist is required.");
-      System.exit(1);
-    }
+        checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
+        int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
+        int blackListCount = options.has(blacklistOpt) ? 1 : 0;
+        if (whiteListCount + blackListCount != 1) {
+            System.err.println("Exactly one of whitelist or blacklist is required.");
+            System.exit(1);
+        }
 
-    String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
-    String zkClientJarFile = options.valueOf(zkClient01JarOpt);
-    String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
-    int numConsumers = options.valueOf(numStreamsOpt);
-    String producerConfigFile_08 = options.valueOf(producerConfigOpt);
-    int numProducers = options.valueOf(numProducersOpt);
-    final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
-    final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
-
-    try {
-      File kafkaJar_07 = new File(kafkaJarFile_07);
-      File zkClientJar = new File(zkClientJarFile);
-      ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] {
-        kafkaJar_07.toURI().toURL(),
-        zkClientJar.toURI().toURL()
-      });
-
-      /** Construct the 07 consumer config **/
-      ConsumerConfig_07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
-      KafkaStaticConsumer_07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
-      ConsumerConnector_07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
-      KafkaStream_07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
-      TopicFilter_07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
-      WhiteList_07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
-      BlackList_07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
-      KafkaMessageClass_07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
-      KafkaConsumerIteratorClass_07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
-      KafkaMessageAndMetatDataClass_07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
-
-      Constructor ConsumerConfigConstructor_07 = ConsumerConfig_07.getConstructor(Properties.class);
-      Properties kafkaConsumerProperties_07 = new Properties();
-      kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
-      /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
-      if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
-        log.warn("Shallow iterator should not be used in the migration tool");
-        kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
-      }
-      Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
-
-      /** Construct the 07 consumer connector **/
-      Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
-      final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
-      Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod(
-        "createMessageStreamsByFilter",
-        TopicFilter_07, int.class);
-      final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown");
-      Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class);
-      Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class);
-      Object filterSpec = null;
-      if(options.has(whitelistOpt))
-        filterSpec = WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt));
-      else
-        filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt));
-
-      Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers);
-
-      Properties kafkaProducerProperties_08 = new Properties();
-      kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
-      kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
-      // create a producer channel instead
-      int queueSize = options.valueOf(queueSizeOpt);
-      ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
-      int threadId = 0;
-
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        @Override
-        public void run() {
-          try {
-            ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
-          } catch(Exception e) {
-            log.error("Error while shutting down Kafka consumer", e);
-          }
-          for(MigrationThread migrationThread : migrationThreads) {
-            migrationThread.shutdown();
-          }
-          for(ProducerThread producerThread : producerThreads) {
-            producerThread.shutdown();
-          }
-          for(ProducerThread producerThread : producerThreads) {
-            producerThread.awaitShutdown();
-          }
-          log.info("Kafka migration tool shutdown successfully");
+        String kafkaJarFile07 = options.valueOf(kafka07JarOpt);
+        String zkClientJarFile = options.valueOf(zkClient01JarOpt);
+        String consumerConfigFile07 = options.valueOf(consumerConfigOpt);
+        int numConsumers = options.valueOf(numStreamsOpt);
+        String producerConfigFile08 = options.valueOf(producerConfigOpt);
+        int numProducers = options.valueOf(numProducersOpt);
+        final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
+        final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
+
+        try {
+            File kafkaJar07 = new File(kafkaJarFile07);
+            File zkClientJar = new File(zkClientJarFile);
+            ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{
+                kafkaJar07.toURI().toURL(),
+                zkClientJar.toURI().toURL()
+            });
+
+            /** Construct the 07 consumer config **/
+            consumerConfig07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
+            kafkaStaticConsumer07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
+            consumerConnector07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
+            kafkaStream07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
+            topicFilter07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
+            whiteList07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
+            blackList07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
+            kafkaMessageClass07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
+            kafkaConsumerIteratorClass07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
+            kafkaMessageAndMetaDataClass07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
+
+            Constructor consumerConfigConstructor07 = consumerConfig07.getConstructor(Properties.class);
+            Properties kafkaConsumerProperties07 = new Properties();
+            kafkaConsumerProperties07.load(new FileInputStream(consumerConfigFile07));
+            /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
+            if (kafkaConsumerProperties07.getProperty("shallow.iterator.enable", "").equals("true")) {
+                log.warn("Shallow iterator should not be used in the migration tool");
+                kafkaConsumerProperties07.setProperty("shallow.iterator.enable", "false");
+            }
+            Object consumerConfig07 = consumerConfigConstructor07.newInstance(kafkaConsumerProperties07);
+
+            /** Construct the 07 consumer connector **/
+            Method consumerConnectorCreationMethod07 = kafkaStaticConsumer07.getMethod("createJavaConsumerConnector", KafkaMigrationTool.consumerConfig07);
+            final Object consumerConnector07 = consumerConnectorCreationMethod07.invoke(null, consumerConfig07);
+            Method consumerConnectorCreateMessageStreamsMethod07 = KafkaMigrationTool.consumerConnector07.getMethod(
+                "createMessageStreamsByFilter",
+                topicFilter07, int.class);
+            final Method consumerConnectorShutdownMethod07 = KafkaMigrationTool.consumerConnector07.getMethod("shutdown");
+            Constructor whiteListConstructor07 = whiteList07.getConstructor(String.class);
+            Constructor blackListConstructor07 = blackList07.getConstructor(String.class);
+            Object filterSpec = null;
+            if (options.has(whitelistOpt))
+                filterSpec = whiteListConstructor07.newInstance(options.valueOf(whitelistOpt));
+            else
+                filterSpec = blackListConstructor07.newInstance(options.valueOf(blacklistOpt));
+
+            Object retKafkaStreams = consumerConnectorCreateMessageStreamsMethod07.invoke(consumerConnector07, filterSpec, numConsumers);
+
+            Properties kafkaProducerProperties08 = new Properties();
+            kafkaProducerProperties08.load(new FileInputStream(producerConfigFile08));
+            kafkaProducerProperties08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
+            // create a producer channel instead
+            int queueSize = options.valueOf(queueSizeOpt);
+            ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
+            int threadId = 0;
+
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        consumerConnectorShutdownMethod07.invoke(consumerConnector07);
+                    } catch (Exception e) {
+                        log.error("Error while shutting down Kafka consumer", e);
+                    }
+                    for (MigrationThread migrationThread : migrationThreads) {
+                        migrationThread.shutdown();
+                    }
+                    for (ProducerThread producerThread : producerThreads) {
+                        producerThread.shutdown();
+                    }
+                    for (ProducerThread producerThread : producerThreads) {
+                        producerThread.awaitShutdown();
+                    }
+                    log.info("Kafka migration tool shutdown successfully");
+                }
+            });
+
+            // start consumer threads
+            for (Object stream : (List) retKafkaStreams) {
+                MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
+                threadId++;
+                thread.start();
+                migrationThreads.add(thread);
+            }
+
+            String clientId = kafkaProducerProperties08.getProperty("client.id");
+            // start producer threads
+            for (int i = 0; i < numProducers; i++) {
+                kafkaProducerProperties08.put("client.id", clientId + "-" + i);
+                ProducerConfig producerConfig08 = new ProducerConfig(kafkaProducerProperties08);
+                Producer producer = new Producer(producerConfig08);
+                ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
+                producerThread.start();
+                producerThreads.add(producerThread);
+            }
+        } catch (Throwable e) {
+            System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
+            log.error("Kafka migration tool failed: ", e);
         }
-      });
-
-      // start consumer threads
-      for(Object stream: (List)retKafkaStreams) {
-        MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
-        threadId ++;
-        thread.start();
-        migrationThreads.add(thread);
-      }
-
-      String clientId = kafkaProducerProperties_08.getProperty("client.id");
-      // start producer threads
-      for (int i = 0; i < numProducers; i++) {
-        kafkaProducerProperties_08.put("client.id", clientId + "-" + i);
-        ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
-        Producer producer = new Producer(producerConfig_08);
-        ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
-        producerThread.start();
-        producerThreads.add(producerThread);
-      }
-    }
-    catch (Throwable e){
-      System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
-      log.error("Kafka migration tool failed: ", e);
     }
-  }
-
-  private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
-    for(OptionSpec arg : required) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"");
-        parser.printHelpOn(System.err);
-        System.exit(1);
-      }
+
+    private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
+        for (OptionSpec arg : required) {
+            if (!options.has(arg)) {
+                System.err.println("Missing required argument \"" + arg + "\"");
+                parser.printHelpOn(System.err);
+                System.exit(1);
+            }
+        }
     }
-  }
 
-  static class ProducerDataChannel<T> {
-    private final int producerQueueSize;
-    private final BlockingQueue<T> producerRequestQueue;
+    static class ProducerDataChannel<T> {
+        private final int producerQueueSize;
+        private final BlockingQueue<T> producerRequestQueue;
 
-    public ProducerDataChannel(int queueSize) {
-      producerQueueSize = queueSize;
-      producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize);
-    }
+        public ProducerDataChannel(int queueSize) {
+            producerQueueSize = queueSize;
+            producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize);
+        }
 
-    public void sendRequest(T data) throws InterruptedException {
-      producerRequestQueue.put(data);
-    }
+        public void sendRequest(T data) throws InterruptedException {
+            producerRequestQueue.put(data);
+        }
 
-    public T receiveRequest() throws InterruptedException {
-      return producerRequestQueue.take();
-    }
-  }
-
-  private static class MigrationThread extends Thread {
-    private final Object stream;
-    private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
-    private final int threadId;
-    private final String threadName;
-    private final org.apache.log4j.Logger logger;
-    private CountDownLatch shutdownComplete = new CountDownLatch(1);
-    private final AtomicBoolean isRunning = new AtomicBoolean(true);
-
-    MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) {
-      stream = _stream;
-      producerDataChannel = _producerDataChannel;
-      threadId = _threadId;
-      threadName = "MigrationThread-" + threadId;
-      logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
-      this.setName(threadName);
+        public T receiveRequest() throws InterruptedException {
+            return producerRequestQueue.take();
+        }
     }
 
-    public void run() {
-      try {
-        Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload");
-        Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message");
-        Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic");
-        Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator");
-        Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext");
-        Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next");
-        Object iterator = ConsumerIteratorMethod.invoke(stream);
-
-        while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) {
-          Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
-          Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
-          Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
-          Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07);
-          int size = ((ByteBuffer)payload_07).remaining();
-          byte[] bytes = new byte[size];
-          ((ByteBuffer)payload_07).get(bytes);
-          if(logger.isDebugEnabled())
-            logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
-          KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
-          producerDataChannel.sendRequest(producerData);
+    private static class MigrationThread extends Thread {
+        private final Object stream;
+        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
+        private final int threadId;
+        private final String threadName;
+        private final org.apache.log4j.Logger logger;
+        private CountDownLatch shutdownComplete = new CountDownLatch(1);
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
+        MigrationThread(Object stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, int threadId) {
+            this.stream = stream;
+            this.producerDataChannel = producerDataChannel;
+            this.threadId = threadId;
+            threadName = "MigrationThread-" + threadId;
+            logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
+            this.setName(threadName);
         }
-        logger.info("Migration thread " + threadName + " finished running");
-      } catch (InvocationTargetException t){
-        logger.fatal("Migration thread failure due to root cause ", t.getCause());
-      } catch (Throwable t){
-        logger.fatal("Migration thread failure due to ", t);
-      } finally {
-        shutdownComplete.countDown();
-      }
-    }
 
-    public void shutdown() {
-      logger.info("Migration thread " + threadName + " shutting down");
-      isRunning.set(false);
-      interrupt();
-      try {
-        shutdownComplete.await();
-      } catch(InterruptedException ie) {
-        logger.warn("Interrupt during shutdown of MigrationThread", ie);
-      }
-      logger.info("Migration thread " + threadName + " shutdown complete");
-    }
-  }
-
-  static class ProducerThread extends Thread {
-    private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
-    private final Producer<byte[], byte[]> producer;
-    private final int threadId;
-    private String threadName;
-    private org.apache.log4j.Logger logger;
-    private CountDownLatch shutdownComplete = new CountDownLatch(1);
-    private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
-
-    public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel,
-                          Producer<byte[], byte[]> _producer,
-                          int _threadId) {
-      producerDataChannel = _producerDataChannel;
-      producer = _producer;
-      threadId = _threadId;
-      threadName = "ProducerThread-" + threadId;
-      logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
-      this.setName(threadName);
-    }
+        public void run() {
+            try {
+                Method messageGetPayloadMethod07 = kafkaMessageClass07.getMethod("payload");
+                Method kafkaGetMessageMethod07 = kafkaMessageAndMetaDataClass07.getMethod("message");
+                Method kafkaGetTopicMethod07 = kafkaMessageAndMetaDataClass07.getMethod("topic");
+                Method consumerIteratorMethod = kafkaStream07.getMethod("iterator");
+                Method kafkaStreamHasNextMethod07 = kafkaConsumerIteratorClass07.getMethod("hasNext");
+                Method kafkaStreamNextMethod07 = kafkaConsumerIteratorClass07.getMethod("next");
+                Object iterator = consumerIteratorMethod.invoke(stream);
+
+                while (((Boolean) kafkaStreamHasNextMethod07.invoke(iterator)).booleanValue()) {
+                    Object messageAndMetaData07 = kafkaStreamNextMethod07.invoke(iterator);
+                    Object message07 = kafkaGetMessageMethod07.invoke(messageAndMetaData07);
+                    Object topic = kafkaGetTopicMethod07.invoke(messageAndMetaData07);
+                    Object payload07 = messageGetPayloadMethod07.invoke(message07);
+                    int size = ((ByteBuffer) payload07).remaining();
+                    byte[] bytes = new byte[size];
+                    ((ByteBuffer) payload07).get(bytes);
+                    if (logger.isDebugEnabled())
+                        logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic " + topic);
+                    KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String) topic, null, bytes);
+                    producerDataChannel.sendRequest(producerData);
+                }
+                logger.info("Migration thread " + threadName + " finished running");
+            } catch (InvocationTargetException t) {
+                logger.fatal("Migration thread failure due to root cause ", t.getCause());
+            } catch (Throwable t) {
+                logger.fatal("Migration thread failure due to ", t);
+            } finally {
+                shutdownComplete.countDown();
+            }
+        }
 
-    public void run() {
-      try{
-        while(true) {
-          KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
-          if(!data.equals(shutdownMessage)) {
-            producer.send(data);
-            if(logger.isDebugEnabled()) logger.debug(String.format("Sending message %s", new String(data.message())));
-          }
-          else
-            break;
+        public void shutdown() {
+            logger.info("Migration thread " + threadName + " shutting down");
+            isRunning.set(false);
+            interrupt();
+            try {
+                shutdownComplete.await();
+            } catch (InterruptedException ie) {
+                logger.warn("Interrupt during shutdown of MigrationThread", ie);
+            }
+            logger.info("Migration thread " + threadName + " shutdown complete");
         }
-        logger.info("Producer thread " + threadName + " finished running");
-      } catch (Throwable t){
-        logger.fatal("Producer thread failure due to ", t);
-      } finally {
-        shutdownComplete.countDown();
-      }
     }
 
-    public void shutdown() {
-      try {
-        logger.info("Producer thread " + threadName + " shutting down");
-        producerDataChannel.sendRequest(shutdownMessage);
-      } catch(InterruptedException ie) {
-        logger.warn("Interrupt during shutdown of ProducerThread", ie);
-      }
-    }
+    static class ProducerThread extends Thread {
+        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
+        private final Producer<byte[], byte[]> producer;
+        private final int threadId;
+        private String threadName;
+        private org.apache.log4j.Logger logger;
+        private CountDownLatch shutdownComplete = new CountDownLatch(1);
+        private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
+
+        public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel,
+                              Producer<byte[], byte[]> producer,
+                              int threadId) {
+            this.producerDataChannel = producerDataChannel;
+            this.producer = producer;
+            this.threadId = threadId;
+            threadName = "ProducerThread-" + threadId;
+            logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
+            this.setName(threadName);
+        }
 
-    public void awaitShutdown() {
-      try {
-        shutdownComplete.await();
-        producer.close();
-        logger.info("Producer thread " + threadName + " shutdown complete");
-      } catch(InterruptedException ie) {
-        logger.warn("Interrupt during shutdown of ProducerThread", ie);
-      }
-    }
-  }
+        public void run() {
+            try {
+                while (true) {
+                    KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
+                    if (!data.equals(shutdownMessage)) {
+                        producer.send(data);
+                        if (logger.isDebugEnabled())
+                            logger.debug(String.format("Sending message %s", new String(data.message())));
+                    } else
+                        break;
+                }
+                logger.info("Producer thread " + threadName + " finished running");
+            } catch (Throwable t) {
+                logger.fatal("Producer thread failure due to ", t);
+            } finally {
+                shutdownComplete.countDown();
+            }
+        }
 
-  /**
-   * A parent-last class loader that will try the child class loader first and then the parent.
-   * This takes a fair bit of doing because java really prefers parent-first.
-   */
-  private static class ParentLastURLClassLoader extends ClassLoader {
-    private ChildURLClassLoader childClassLoader;
+        public void shutdown() {
+            try {
+                logger.info("Producer thread " + threadName + " shutting down");
+                producerDataChannel.sendRequest(shutdownMessage);
+            } catch (InterruptedException ie) {
+                logger.warn("Interrupt during shutdown of ProducerThread", ie);
+            }
+        }
 
-    /**
-     * This class allows me to call findClass on a class loader
-     */
-    private static class FindClassClassLoader extends ClassLoader {
-      public FindClassClassLoader(ClassLoader parent) {
-        super(parent);
-      }
-      @Override
-      public Class<?> findClass(String name) throws ClassNotFoundException {
-        return super.findClass(name);
-      }
+        public void awaitShutdown() {
+            try {
+                shutdownComplete.await();
+                producer.close();
+                logger.info("Producer thread " + threadName + " shutdown complete");
+            } catch (InterruptedException ie) {
+                logger.warn("Interrupt during shutdown of ProducerThread", ie);
+            }
+        }
     }
 
     /**
-     * This class delegates (child then parent) for the findClass method for a URLClassLoader.
-     * We need this because findClass is protected in URLClassLoader
+     * A parent-last class loader that will try the child class loader first and then the parent.
+     * This takes a fair bit of doing because java really prefers parent-first.
      */
-    private static class ChildURLClassLoader extends URLClassLoader {
-      private FindClassClassLoader realParent;
-      public ChildURLClassLoader( URL[] urls, FindClassClassLoader realParent) {
-        super(urls, null);
-        this.realParent = realParent;
-      }
-
-      @Override
-      public Class<?> findClass(String name) throws ClassNotFoundException {
-        try{
-          // first try to use the URLClassLoader findClass
-          return super.findClass(name);
+    private static class ParentLastURLClassLoader extends ClassLoader {
+        private ChildURLClassLoader childClassLoader;
+
+        /**
+         * This class allows me to call findClass on a class loader
+         */
+        private static class FindClassClassLoader extends ClassLoader {
+            public FindClassClassLoader(ClassLoader parent) {
+                super(parent);
+            }
+
+            @Override
+            public Class<?> findClass(String name) throws ClassNotFoundException {
+                return super.findClass(name);
+            }
         }
-        catch( ClassNotFoundException e ) {
-          // if that fails, we ask our real parent class loader to load the class (we give up)
-          return realParent.loadClass(name);
+
+        /**
+         * This class delegates (child then parent) for the findClass method for a URLClassLoader.
+         * We need this because findClass is protected in URLClassLoader
+         */
+        private static class ChildURLClassLoader extends URLClassLoader {
+            private FindClassClassLoader realParent;
+
+            public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) {
+                super(urls, null);
+                this.realParent = realParent;
+            }
+
+            @Override
+            public Class<?> findClass(String name) throws ClassNotFoundException {
+                try {
+                    // first try to use the URLClassLoader findClass
+                    return super.findClass(name);
+                } catch (ClassNotFoundException e) {
+                    // if that fails, we ask our real parent class loader to load the class (we give up)
+                    return realParent.loadClass(name);
+                }
+            }
         }
-      }
-    }
 
-    public ParentLastURLClassLoader(URL[] urls) {
-      super(Thread.currentThread().getContextClassLoader());
-      childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
-    }
+        public ParentLastURLClassLoader(URL[] urls) {
+            super(Thread.currentThread().getContextClassLoader());
+            childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
+        }
 
-    @Override
-    protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
-      try {
-        // first we try to find a class inside the child class loader
-        return childClassLoader.findClass(name);
-      }
-      catch( ClassNotFoundException e ) {
-        // didn't find it, try the parent
-        return super.loadClass(name, resolve);
-      }
+        @Override
+        protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+            try {
+                // first we try to find a class inside the child class loader
+                return childClassLoader.findClass(name);
+            } catch (ClassNotFoundException e) {
+                // didn't find it, try the parent
+                return super.loadClass(name, resolve);
+            }
+        }
     }
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/examples/src/main/java/kafka/examples/Consumer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index 3bb93ee..2a0c59a 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -16,52 +16,50 @@
  */
 package kafka.examples;
 
-import java.util.Collections;
-import java.util.Properties;
-
 import kafka.utils.ShutdownableThread;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
-public class Consumer extends ShutdownableThread
-{
-  private final KafkaConsumer<Integer, String> consumer;
-  private final String topic;
+import java.util.Collections;
+import java.util.Properties;
+
+public class Consumer extends ShutdownableThread {
+    private final KafkaConsumer<Integer, String> consumer;
+    private final String topic;
 
-  public Consumer(String topic)
-  {
-    super("KafkaConsumerExample", false);
-    Properties props = new Properties();
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+    public Consumer(String topic) {
+        super("KafkaConsumerExample", false);
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
 
-    consumer = new KafkaConsumer<>(props);
-    this.topic = topic;
-  }
+        consumer = new KafkaConsumer<>(props);
+        this.topic = topic;
+    }
 
-  @Override
-  public void doWork() {
-    consumer.subscribe(Collections.singletonList(this.topic));
-    ConsumerRecords<Integer, String> records = consumer.poll(1000);
-    for (ConsumerRecord<Integer, String> record : records) {
-      System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+    @Override
+    public void doWork() {
+        consumer.subscribe(Collections.singletonList(this.topic));
+        ConsumerRecords<Integer, String> records = consumer.poll(1000);
+        for (ConsumerRecord<Integer, String> record : records) {
+            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+        }
     }
-  }
 
-  @Override
-  public String name() {
-    return null;
-  }
+    @Override
+    public String name() {
+        return null;
+    }
 
-  @Override
-  public boolean isInterruptible() {
-    return false;
-  }
-}
\ No newline at end of file
+    @Override
+    public boolean isInterruptible() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index e96991a..e732d5c 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -16,16 +16,14 @@
  */
 package kafka.examples;
 
-public class KafkaConsumerProducerDemo implements KafkaProperties
-{
-  public static void main(String[] args)
-  {
-    final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
-    Producer producerThread = new Producer(KafkaProperties.topic, isAsync);
-    producerThread.start();
+public class KafkaConsumerProducerDemo implements KafkaProperties {
+    public static void main(String[] args) {
+        final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
+        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
+        producerThread.start();
+
+        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
+        consumerThread.start();
 
-    Consumer consumerThread = new Consumer(KafkaProperties.topic);
-    consumerThread.start();
-    
-  }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/examples/src/main/java/kafka/examples/KafkaProperties.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java
index 9d1cd31..b57e1bd 100644
--- a/examples/src/main/java/kafka/examples/KafkaProperties.java
+++ b/examples/src/main/java/kafka/examples/KafkaProperties.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,17 +16,16 @@
  */
 package kafka.examples;
 
-public interface KafkaProperties
-{
-  final static String zkConnect = "127.0.0.1:2181";
-  final static  String groupId = "group1";
-  final static String topic = "topic1";
-  final static String kafkaServerURL = "localhost";
-  final static int kafkaServerPort = 9092;
-  final static int kafkaProducerBufferSize = 64*1024;
-  final static int connectionTimeOut = 100000;
-  final static int reconnectInterval = 10000;
-  final static String topic2 = "topic2";
-  final static String topic3 = "topic3";
-  final static String clientId = "SimpleConsumerDemoClient";
+public interface KafkaProperties {
+    String ZK_CONNECT = "127.0.0.1:2181";
+    String GROUP_ID = "group1";
+    String TOPIC = "topic1";
+    String KAFKA_SERVER_URL = "localhost";
+    int KAFKA_SERVER_PORT = 9092;
+    int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
+    int CONNECTION_TIMEOUT = 100000;
+    int RECONNECT_INTERVAL = 10000;
+    String TOPIC2 = "topic2";
+    String TOPIC3 = "topic3";
+    String CLIENT_ID = "SimpleConsumerDemoClient";
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/examples/src/main/java/kafka/examples/Producer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java
index ccc9925..393bf1e 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -16,90 +16,86 @@
  */
 package kafka.examples;
 
-
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 
-public class Producer extends Thread
-{
-  private final KafkaProducer<Integer, String> producer;
-  private final String topic;
-  private final Boolean isAsync;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+public class Producer extends Thread {
+    private final KafkaProducer<Integer, String> producer;
+    private final String topic;
+    private final Boolean isAsync;
 
-  public Producer(String topic, Boolean isAsync)
-  {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "localhost:9092");
-    props.put("client.id", "DemoProducer");
-    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
-    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-    producer = new KafkaProducer<Integer, String>(props);
-    this.topic = topic;
-    this.isAsync = isAsync;
-  }
+    public Producer(String topic, Boolean isAsync) {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("client.id", "DemoProducer");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        producer = new KafkaProducer<Integer, String>(props);
+        this.topic = topic;
+        this.isAsync = isAsync;
+    }
 
-  public void run() {
-    int messageNo = 1;
-    while(true)
-    {
-      String messageStr = "Message_" + messageNo;
-      long startTime = System.currentTimeMillis();
-      if (isAsync) { // Send asynchronously
-        producer.send(new ProducerRecord<Integer, String>(topic,
-            messageNo,
-            messageStr), new DemoCallBack(startTime, messageNo, messageStr));
-      } else { // Send synchronously
-        try {
-          producer.send(new ProducerRecord<Integer, String>(topic,
-              messageNo,
-              messageStr)).get();
-          System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        } catch (ExecutionException e) {
-          e.printStackTrace();
+    public void run() {
+        int messageNo = 1;
+        while (true) {
+            String messageStr = "Message_" + messageNo;
+            long startTime = System.currentTimeMillis();
+            if (isAsync) { // Send asynchronously
+                producer.send(new ProducerRecord<Integer, String>(topic,
+                    messageNo,
+                    messageStr), new DemoCallBack(startTime, messageNo, messageStr));
+            } else { // Send synchronously
+                try {
+                    producer.send(new ProducerRecord<Integer, String>(topic,
+                        messageNo,
+                        messageStr)).get();
+                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (ExecutionException e) {
+                    e.printStackTrace();
+                }
+            }
+            ++messageNo;
         }
-      }
-      ++messageNo;
     }
-  }
 }
 
 class DemoCallBack implements Callback {
 
-  private long startTime;
-  private int key;
-  private String message;
+    private long startTime;
+    private int key;
+    private String message;
 
-  public DemoCallBack(long startTime, int key, String message) {
-    this.startTime = startTime;
-    this.key = key;
-    this.message = message;
-  }
+    public DemoCallBack(long startTime, int key, String message) {
+        this.startTime = startTime;
+        this.key = key;
+        this.message = message;
+    }
 
-  /**
-   * A callback method the user can implement to provide asynchronous handling of request completion. This method will
-   * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
-   * non-null.
-   *
-   * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
-   *                  occurred.
-   * @param exception The exception thrown during processing of this record. Null if no error occurred.
-   */
-  public void onCompletion(RecordMetadata metadata, Exception exception) {
-    long elapsedTime = System.currentTimeMillis() - startTime;
-    if (metadata != null) {
-      System.out.println(
-          "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
-              "), " +
-              "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
-    } else {
-      exception.printStackTrace();
+    /**
+     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
+     * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
+     * non-null.
+     *
+     * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
+     *                  occurred.
+     * @param exception The exception thrown during processing of this record. Null if no error occurred.
+     */
+    public void onCompletion(RecordMetadata metadata, Exception exception) {
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        if (metadata != null) {
+            System.out.println(
+                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
+                    "), " +
+                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
+        } else {
+            exception.printStackTrace();
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64b746bd/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index c43b461..1c56867 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -19,74 +19,74 @@ package kafka.examples;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.List;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class SimpleConsumerDemo {
-    
-  private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
-    for(MessageAndOffset messageAndOffset: messageSet) {
-      ByteBuffer payload = messageAndOffset.message().payload();
-      byte[] bytes = new byte[payload.limit()];
-      payload.get(bytes);
-      System.out.println(new String(bytes, "UTF-8"));
+
+    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
+        for (MessageAndOffset messageAndOffset : messageSet) {
+            ByteBuffer payload = messageAndOffset.message().payload();
+            byte[] bytes = new byte[payload.limit()];
+            payload.get(bytes);
+            System.out.println(new String(bytes, "UTF-8"));
+        }
     }
-  }
 
-  private static void generateData() {
-    Producer producer2 = new Producer(KafkaProperties.topic2, false);
-    producer2.start();
-    Producer producer3 = new Producer(KafkaProperties.topic3, false);
-    producer3.start();
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+    private static void generateData() {
+        Producer producer2 = new Producer(KafkaProperties.TOPIC2, false);
+        producer2.start();
+        Producer producer3 = new Producer(KafkaProperties.TOPIC3, false);
+        producer3.start();
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
     }
-  }
-  
-  public static void main(String[] args) throws Exception {
-    generateData();
-      
-    SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
-                                                       KafkaProperties.kafkaServerPort,
-                                                       KafkaProperties.connectionTimeOut,
-                                                       KafkaProperties.kafkaProducerBufferSize,
-                                                       KafkaProperties.clientId);
 
-    System.out.println("Testing single fetch");
-    FetchRequest req = new FetchRequestBuilder()
-            .clientId(KafkaProperties.clientId)
-            .addFetch(KafkaProperties.topic2, 0, 0L, 100)
+    public static void main(String[] args) throws Exception {
+        generateData();
+
+        SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.KAFKA_SERVER_URL,
+            KafkaProperties.KAFKA_SERVER_PORT,
+            KafkaProperties.CONNECTION_TIMEOUT,
+            KafkaProperties.KAFKA_PRODUCER_BUFFER_SIZE,
+            KafkaProperties.CLIENT_ID);
+
+        System.out.println("Testing single fetch");
+        FetchRequest req = new FetchRequestBuilder()
+            .clientId(KafkaProperties.CLIENT_ID)
+            .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
             .build();
-    FetchResponse fetchResponse = simpleConsumer.fetch(req);
-      printMessages(fetchResponse.messageSet(KafkaProperties.topic2, 0));
+        FetchResponse fetchResponse = simpleConsumer.fetch(req);
+        printMessages(fetchResponse.messageSet(KafkaProperties.TOPIC2, 0));
 
-    System.out.println("Testing single multi-fetch");
-    Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
-    topicMap.put(KafkaProperties.topic2, Collections.singletonList(0));
-    topicMap.put(KafkaProperties.topic3, Collections.singletonList(0));
-    req = new FetchRequestBuilder()
-            .clientId(KafkaProperties.clientId)
-            .addFetch(KafkaProperties.topic2, 0, 0L, 100)
-            .addFetch(KafkaProperties.topic3, 0, 0L, 100)
+        System.out.println("Testing single multi-fetch");
+        Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
+        topicMap.put(KafkaProperties.TOPIC2, Collections.singletonList(0));
+        topicMap.put(KafkaProperties.TOPIC3, Collections.singletonList(0));
+        req = new FetchRequestBuilder()
+            .clientId(KafkaProperties.CLIENT_ID)
+            .addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
+            .addFetch(KafkaProperties.TOPIC3, 0, 0L, 100)
             .build();
-    fetchResponse = simpleConsumer.fetch(req);
-    int fetchReq = 0;
-    for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
-      String topic = entry.getKey();
-      for ( Integer offset : entry.getValue()) {
-        System.out.println("Response from fetch request no: " + ++fetchReq);
-        printMessages(fetchResponse.messageSet(topic, offset));
-      }
+        fetchResponse = simpleConsumer.fetch(req);
+        int fetchReq = 0;
+        for (Map.Entry<String, List<Integer>> entry : topicMap.entrySet()) {
+            String topic = entry.getKey();
+            for (Integer offset : entry.getValue()) {
+                System.out.println("Response from fetch request no: " + ++fetchReq);
+                printMessages(fetchResponse.messageSet(topic, offset));
+            }
+        }
     }
-  }
 }