You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/24 23:27:52 UTC

git commit: KAFKA-734 Migration tool needs a revamp, it was poorly written and has many performance bugs; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 ccfdabc04 -> d925b157f


KAFKA-734 Migration tool needs a revamp, it was poorly written and has many performance bugs; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d925b157
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d925b157
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d925b157

Branch: refs/heads/0.8
Commit: d925b157f42b13cb410a86e8850c23a11de3d2f1
Parents: ccfdabc
Author: Neha Narkhede <ne...@gmail.com>
Authored: Sun Feb 24 14:27:21 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sun Feb 24 14:27:21 2013 -0800

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala      |    1 +
 core/src/main/scala/kafka/producer/Producer.scala  |    3 +-
 .../main/scala/kafka/tools/KafkaMigrationTool.java |  364 ++++++++++-----
 core/src/main/scala/kafka/utils/Utils.scala        |    2 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala    |    5 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |    2 +-
 6 files changed, 244 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 9db9a8b..c9e4127 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -646,6 +646,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val topicThreadId = e._1
       val q = e._2._1
       topicThreadIdAndQueues.put(topicThreadId, q)
+      debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
       newGauge(
         config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
         new Gauge[Int] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index f7d85b9..c837091 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -40,8 +40,7 @@ class Producer[K,V](val config: ProducerConfig,
     case "sync" =>
     case "async" =>
       sync = false
-      val asyncProducerID = random.nextInt(Int.MaxValue)
-      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, 
+      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                        queue,
                                                        eventHandler, 
                                                        config.queueBufferingMaxMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index dbbddae..f3a5095 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -22,13 +22,12 @@ import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import kafka.utils.Utils;
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -36,27 +35,28 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 /**
- * The kafka 07 to 08 online migration tool, it's used for migrating data from 07 to 08 cluster. Internally,
- * it's composed of a kafka 07 consumer and kafka 08 producer. The kafka 07 consumer consumes data from the
- * 07 cluster, and the kafka 08 producer produces data to the 08 cluster.
+ * This is a  kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
+ * it's composed of a kafka 0.7 consumer and kafka 0.8 producer. The kafka 0.7 consumer consumes data from the
+ * 0.7 cluster, and the kafka 0.8 producer produces data to the 0.8 cluster.
  *
- * The 07 consumer is loaded from kafka 07 jar using a "parent last, child first" java class loader.
- * Ordinary class loader is "parent first, child last", and kafka 08 and 07 both have classes for a lot of
- * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 07 jar will
- * will still load the 08 version class.
+ * The 0.7 consumer is loaded from kafka 0.7 jar using a "parent last, child first" java class loader.
+ * Ordinary class loader is "parent first, child last", and kafka 0.8 and 0.7 both have classes for a lot of
+ * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 0.7 jar will
+ * will still load the 0.8 version class.
  *
- * As kafka 07 and kafka 08 used different version of zkClient, the zkClient jar used by kafka 07 should
+ * As kafka 0.7 and kafka 0.8 used different version of zkClient, the zkClient jar used by kafka 0.7 should
  * also be used by the class loader.
  *
- * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer,
- * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code.
+ * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
+ * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
  */
 
-public class KafkaMigrationTool
-{
+public class KafkaMigrationTool {
   private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
   private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
   private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
@@ -80,90 +80,91 @@ public class KafkaMigrationTool
   private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
   private static Class<?> KafkaMessageClass_07 = null;
 
-  public static void main(String[] args){
+  public static void main(String[] args) throws InterruptedException, IOException {
     OptionParser parser = new OptionParser();
     ArgumentAcceptingOptionSpec<String> consumerConfigOpt
-        = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source cluster. " + "You man specify multiple of these.")
-        .withRequiredArg()
-        .describedAs("config file")
-        .ofType(String.class);
+      = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(String.class);
 
     ArgumentAcceptingOptionSpec<String> producerConfigOpt
-        =  parser.accepts("producer.config", "Embedded producer config.")
-        .withRequiredArg()
-        .describedAs("config file")
-        .ofType(String.class);
+      =  parser.accepts("producer.config", "Producer config.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(String.class);
 
     ArgumentAcceptingOptionSpec<Integer> numProducersOpt
-        =  parser.accepts("num.producers", "Number of producer instances")
-        .withRequiredArg()
-        .describedAs("Number of producers")
-        .ofType(Integer.class)
-        .defaultsTo(1);
+      =  parser.accepts("num.producers", "Number of producer instances")
+      .withRequiredArg()
+      .describedAs("Number of producers")
+      .ofType(Integer.class)
+      .defaultsTo(1);
 
     ArgumentAcceptingOptionSpec<String> zkClient01JarOpt
-        = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
-        .withRequiredArg()
-        .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
-        .ofType(String.class);
+      = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
+      .withRequiredArg()
+      .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
+      .ofType(String.class);
 
     ArgumentAcceptingOptionSpec<String> kafka07JarOpt
-        = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
-        .withRequiredArg()
-        .describedAs("kafka 0.7 jar")
-        .ofType(String.class);
+      = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
+      .withRequiredArg()
+      .describedAs("kafka 0.7 jar")
+      .ofType(String.class);
 
     ArgumentAcceptingOptionSpec<Integer> numStreamsOpt
-        = parser.accepts("num.streams", "Number of consumption streams.")
-        .withRequiredArg()
-        .describedAs("Number of threads")
-        .ofType(Integer.class)
-        .defaultsTo(1);
+      = parser.accepts("num.streams", "Number of consumer streams")
+      .withRequiredArg()
+      .describedAs("Number of consumer threads")
+      .ofType(Integer.class)
+      .defaultsTo(1);
 
     ArgumentAcceptingOptionSpec<String> whitelistOpt
-        = parser.accepts("whitelist", "Whitelist of topics to mirror.")
-        .withRequiredArg()
-        .describedAs("Java regex (String)")
-        .ofType(String.class);
+      = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
+      .withRequiredArg()
+      .describedAs("Java regex (String)")
+      .ofType(String.class);
 
     ArgumentAcceptingOptionSpec<String> blacklistOpt
-        = parser.accepts("blacklist", "Blacklist of topics to mirror.")
-        .withRequiredArg()
-        .describedAs("Java regex (String)")
-        .ofType(String.class);
+      = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
+      .withRequiredArg()
+      .describedAs("Java regex (String)")
+      .ofType(String.class);
 
     OptionSpecBuilder helpOpt
-        = parser.accepts("help", "Print this message.");
+      = parser.accepts("help", "Print this message.");
 
     OptionSet options = parser.parse(args);
 
-    try{
-      if (options.has(helpOpt)){
-        parser.printHelpOn(System.out);
-        System.exit(0);
-      }
-
-      checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
-      int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
-      int blackListCount = options.has(blacklistOpt) ? 1 : 0;
-      if(whiteListCount + blackListCount != 1){
-        System.err.println("Exactly one of whitelist or blacklist is required.");
-        System.exit(1);
-      }
+    if (options.has(helpOpt)) {
+      parser.printHelpOn(System.out);
+      System.exit(0);
+    }
 
-      String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
-      String zkClientJarFile = options.valueOf(zkClient01JarOpt);
-      String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
-      int numStreams = options.valueOf(numStreamsOpt);
-      String producerConfigFile_08 = options.valueOf(producerConfigOpt);
-      int numProducers = options.valueOf(numProducersOpt);
+    checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
+    int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
+    int blackListCount = options.has(blacklistOpt) ? 1 : 0;
+    if(whiteListCount + blackListCount != 1) {
+      System.err.println("Exactly one of whitelist or blacklist is required.");
+      System.exit(1);
+    }
 
+    String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
+    String zkClientJarFile = options.valueOf(zkClient01JarOpt);
+    String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
+    int numConsumers = options.valueOf(numStreamsOpt);
+    String producerConfigFile_08 = options.valueOf(producerConfigOpt);
+    int numProducers = options.valueOf(numProducersOpt);
+    final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
+    final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
 
+    try {
       File kafkaJar_07 = new File(kafkaJarFile_07);
       File zkClientJar = new File(zkClientJarFile);
-      ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{
-          kafkaJar_07.toURI().toURL(),
-          zkClientJar.toURI().toURL()
+      ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] {
+        kafkaJar_07.toURI().toURL(),
+        zkClientJar.toURI().toURL()
       });
 
       /** Construct the 07 consumer config **/
@@ -182,7 +183,7 @@ public class KafkaMigrationTool
       Properties kafkaConsumerProperties_07 = new Properties();
       kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
       /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
-      if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){
+      if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
         logger.warn("Shallow iterator should not be used in the migration tool");
         kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
       }
@@ -190,51 +191,73 @@ public class KafkaMigrationTool
 
       /** Construct the 07 consumer connector **/
       Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
-
-      Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
-
+      final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
       Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod(
-          "createMessageStreamsByFilter",
-          TopicFilter_07, int.class);
-
-
+        "createMessageStreamsByFilter",
+        TopicFilter_07, int.class);
+      final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown");
       Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class);
       Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class);
       Object filterSpec = null;
-
       if(options.has(whitelistOpt))
         filterSpec = WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt));
       else
         filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt));
 
-      Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numStreams);
+      Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers);
 
       Properties kafkaProducerProperties_08 = new Properties();
       kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
       kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
-      ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
+      // create a producer channel instead
+      ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(numProducers);
+      int threadId = 0;
 
-      List<Producer> producers = new ArrayList<Producer>();
-      for (int i = 0; i < numProducers; i++){
-        producers.add(new Producer(producerConfig_08));
-      }
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        @Override
+        public void run() {
+          try {
+            ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
+          } catch(Exception e) {
+            logger.error("Error while shutting down Kafka consumer", e);
+          }
+          for(MigrationThread migrationThread : migrationThreads) {
+            migrationThread.shutdown();
+          }
+          for(ProducerThread producerThread : producerThreads) {
+            producerThread.shutdown();
+          }
+          for(ProducerThread producerThread : producerThreads) {
+            producerThread.awaitShutdown();
+          }
+          logger.info("Kafka migration tool shutdown successfully");
+        }
+      });
 
-      int threadId = 0;
-      for(Object stream: (List)retKafkaStreams){
-        MigrationThread thread = new MigrationThread(stream, producers, threadId);
+      // start consumer threads
+      for(Object stream: (List)retKafkaStreams) {
+        MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
         threadId ++;
         thread.start();
+        migrationThreads.add(thread);
+      }
+      // start producer threads
+      for (int i = 0; i < numProducers; i++) {
+        kafkaProducerProperties_08.put("client.id", String.valueOf(i) + "-" + i);
+        ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
+        Producer producer = new Producer(producerConfig_08);
+        ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
+        producerThread.start();
+        producerThreads.add(producerThread);
       }
     }
     catch (Throwable e){
-      System.out.println("Kafka migration tool failed because of " + e);
-      e.printStackTrace(System.out);
+      System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
       logger.error("Kafka migration tool failed: ", e);
     }
   }
 
-  private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException
-    {
+  private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
     for(OptionSpec arg : required) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"");
@@ -245,37 +268,53 @@ public class KafkaMigrationTool
   }
 
 
-  private static class MigrationThread extends Thread{
-    private Object stream;
-    private List<Producer> producers;
-    private int threadId;
-    private String threadName;
-    private org.apache.log4j.Logger logger;
+  private static class ProducerDataChannel<T> {
+    private final int producerQueueSize;
+    private final BlockingQueue<T> producerRequestQueue;
+
+    public ProducerDataChannel(int queueSize) {
+      producerQueueSize = queueSize;
+      producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize);
+    }
+
+    public void sendRequest(T data) throws InterruptedException {
+      producerRequestQueue.put(data);
+    }
+
+    public T receiveRequest() throws InterruptedException {
+      return producerRequestQueue.take();
+    }
+  }
+
+  private static class MigrationThread extends Thread {
+    private final Object stream;
+    private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
+    private final int threadId;
+    private final String threadName;
+    private final org.apache.log4j.Logger logger;
+    private CountDownLatch shutdownComplete = new CountDownLatch(1);
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
 
-    MigrationThread(Object _stream, List<Producer> _producers, int _threadId){
+    MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, int _threadId) {
       stream = _stream;
-      producers = _producers;
+      producerDataChannel = _producerDataChannel;
       threadId = _threadId;
       threadName = "MigrationThread-" + threadId;
-      logger = org.apache.log4j.Logger.getLogger(threadName);
+      logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
       this.setName(threadName);
     }
 
-    public void run(){
-      try{
+    public void run() {
+      try {
         Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload");
         Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message");
         Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic");
-
         Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator");
         Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext");
         Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next");
-
         Object iterator = ConsumerIteratorMethod.invoke(stream);
 
-        Iterator<Producer> producerCircularIterator = Utils.circularIterator(JavaConversions.asBuffer(producers));
-
-        while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()){
+        while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) {
           Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
           Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
           Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
@@ -283,36 +322,106 @@ public class KafkaMigrationTool
           int size = ((ByteBuffer)payload_07).remaining();
           byte[] bytes = new byte[size];
           ((ByteBuffer)payload_07).get(bytes);
-          logger.debug(String.format("Send kafka 08 message of size %d to topic %s", bytes.length, topic));
+          if(logger.isDebugEnabled())
+            logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
           KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
-          Producer nextProducer = producerCircularIterator.next();
-          nextProducer.send(producerData);
+          producerDataChannel.sendRequest(producerData);
         }
-        logger.info(String.format("Migration thread %s finishes running", threadName));
+        logger.info("Migration thread " + threadName + " finished running");
+      } catch (InvocationTargetException t){
+        logger.fatal("Migration thread failure due to root cause ", t.getCause());
       } catch (Throwable t){
         logger.fatal("Migration thread failure due to ", t);
-        t.printStackTrace(System.out);
+      } finally {
+        shutdownComplete.countDown();
+      }
+    }
+
+    public void shutdown() {
+      logger.info("Migration thread " + threadName + " shutting down");
+      isRunning.set(false);
+      interrupt();
+      try {
+        shutdownComplete.await();
+      } catch(InterruptedException ie) {
+        logger.warn("Interrupt during shutdown of MigrationThread", ie);
       }
+      logger.info("Migration thread " + threadName + " shutdown complete");
     }
   }
 
+  private static class ProducerThread extends Thread {
+    private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
+    private final Producer<String, byte[]> producer;
+    private final int threadId;
+    private String threadName;
+    private org.apache.log4j.Logger logger;
+    private CountDownLatch shutdownComplete = new CountDownLatch(1);
+    private KeyedMessage<String, byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
+
+    public ProducerThread(ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel,
+                          Producer<String, byte[]> _producer,
+                          int _threadId) {
+      producerDataChannel = _producerDataChannel;
+      producer = _producer;
+      threadId = _threadId;
+      threadName = "ProducerThread-" + threadId;
+      logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
+      this.setName(threadName);
+    }
+
+    public void run() {
+      try{
+        while(true) {
+          KeyedMessage<String, byte[]> data = producerDataChannel.receiveRequest();
+          if(!data.equals(shutdownMessage))
+            producer.send(data);
+          else
+            break;
+        }
+        logger.info("Producer thread " + threadName + " finished running");
+      } catch (Throwable t){
+        logger.fatal("Producer thread failure due to ", t);
+      } finally {
+        shutdownComplete.countDown();
+      }
+    }
+
+    public void shutdown() {
+      try {
+        logger.info("Producer thread " + threadName + " shutting down");
+        producerDataChannel.sendRequest(shutdownMessage);
+      } catch(InterruptedException ie) {
+        logger.warn("Interrupt during shutdown of ProducerThread", ie);
+      }
+    }
+
+    public void awaitShutdown() {
+      try {
+        shutdownComplete.await();
+        logger.info("Producer thread " + threadName + " shutdown complete");
+      } catch(InterruptedException ie) {
+        logger.warn("Interrupt during shutdown of ProducerThread", ie);
+      }
+    }
+  }
 
   /**
-   * A parent-last classloader that will try the child classloader first and then the parent.
+   * A parent-last class loader that will try the child class loader first and then the parent.
    * This takes a fair bit of doing because java really prefers parent-first.
    */
-  private static class ParentLastURLClassLoader extends ClassLoader{
+  private static class ParentLastURLClassLoader extends ClassLoader {
     private ChildURLClassLoader childClassLoader;
 
     /**
-     * This class allows me to call findClass on a classloader
+     * This class allows me to call findClass on a class loader
      */
-    private static class FindClassClassLoader extends ClassLoader{
-      public FindClassClassLoader(ClassLoader parent){
+    private static class FindClassClassLoader extends ClassLoader {
+      public FindClassClassLoader(ClassLoader parent) {
         super(parent);
       }
       @Override
-      public Class<?> findClass(String name) throws ClassNotFoundException{
+      public Class<?> findClass(String name) throws ClassNotFoundException {
         return super.findClass(name);
       }
     }
@@ -327,14 +436,15 @@ public class KafkaMigrationTool
         super(urls, null);
         this.realParent = realParent;
       }
+
       @Override
-      public Class<?> findClass(String name) throws ClassNotFoundException{
+      public Class<?> findClass(String name) throws ClassNotFoundException {
         try{
           // first try to use the URLClassLoader findClass
           return super.findClass(name);
         }
-        catch( ClassNotFoundException e ){
-          // if that fails, we ask our real parent classloader to load the class (we give up)
+        catch( ClassNotFoundException e ) {
+          // if that fails, we ask our real parent class loader to load the class (we give up)
           return realParent.loadClass(name);
         }
       }
@@ -347,11 +457,11 @@ public class KafkaMigrationTool
 
     @Override
     protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
-      try{
-        // first we try to find a class inside the child classloader
+      try {
+        // first we try to find a class inside the child class loader
         return childClassLoader.findClass(name);
       }
-      catch( ClassNotFoundException e ){
+      catch( ClassNotFoundException e ) {
         // didn't find it, try the parent
         return super.loadClass(name, resolve);
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 942d6c3..fe4c925 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -25,6 +25,7 @@ import java.lang.management._
 import java.util.zip.CRC32
 import javax.management._
 import scala.collection._
+import mutable.ListBuffer
 import scala.collection.mutable
 import java.util.Properties
 import kafka.common.KafkaException
@@ -572,5 +573,4 @@ object Utils extends Logging {
    * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
    */
   def abs(n: Int) = n & 0x7fffffff
-  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index cce6c8e..0b6244f 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -21,9 +21,10 @@ import java.util.Arrays
 import java.nio.ByteBuffer
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
-import org.junit.Test
 import org.junit.Assert._
 import kafka.common.KafkaException
+import org.junit.{Test}
+import kafka.tools.KafkaMigrationTool
 
 
 class UtilsTest extends JUnitSuite {
@@ -53,7 +54,7 @@ class UtilsTest extends JUnitSuite {
     assertEquals(2, its.next())
     assertEquals(1, its.next())
   }
-  
+
   @Test
   def testReadBytes() {
     for(testCase <- List("", "a", "abcd")) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 507743e..851a99e 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -246,7 +246,7 @@ object ProducerPerformance extends Logging {
       while(j < messagesPerThread) {
         try {
           config.topics.foreach(
-            topic =>{
+            topic => {
               val (producerData, bytesSent_) = generateProducerData(topic, j)
               bytesSent += bytesSent_
               producer.send(producerData)