You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/24 23:27:52 UTC
git commit: KAFKA-734 Migration tool needs a revamp, it was poorly
written and has many performance bugs; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 ccfdabc04 -> d925b157f
KAFKA-734 Migration tool needs a revamp, it was poorly written and has many performance bugs; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d925b157
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d925b157
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d925b157
Branch: refs/heads/0.8
Commit: d925b157f42b13cb410a86e8850c23a11de3d2f1
Parents: ccfdabc
Author: Neha Narkhede <ne...@gmail.com>
Authored: Sun Feb 24 14:27:21 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sun Feb 24 14:27:21 2013 -0800
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 1 +
core/src/main/scala/kafka/producer/Producer.scala | 3 +-
.../main/scala/kafka/tools/KafkaMigrationTool.java | 364 ++++++++++-----
core/src/main/scala/kafka/utils/Utils.scala | 2 +-
.../test/scala/unit/kafka/utils/UtilsTest.scala | 5 +-
.../scala/kafka/perf/ProducerPerformance.scala | 2 +-
6 files changed, 244 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 9db9a8b..c9e4127 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -646,6 +646,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicThreadId = e._1
val q = e._2._1
topicThreadIdAndQueues.put(topicThreadId, q)
+ debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
newGauge(
config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
new Gauge[Int] {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index f7d85b9..c837091 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -40,8 +40,7 @@ class Producer[K,V](val config: ProducerConfig,
case "sync" =>
case "async" =>
sync = false
- val asyncProducerID = random.nextInt(Int.MaxValue)
- producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID,
+ producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
queue,
eventHandler,
config.queueBufferingMaxMs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index dbbddae..f3a5095 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -22,13 +22,12 @@ import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.Utils;
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
@@ -36,27 +35,28 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
- * The kafka 07 to 08 online migration tool, it's used for migrating data from 07 to 08 cluster. Internally,
- * it's composed of a kafka 07 consumer and kafka 08 producer. The kafka 07 consumer consumes data from the
- * 07 cluster, and the kafka 08 producer produces data to the 08 cluster.
+ * This is a kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
+ * it's composed of a kafka 0.7 consumer and kafka 0.8 producer. The kafka 0.7 consumer consumes data from the
+ * 0.7 cluster, and the kafka 0.8 producer produces data to the 0.8 cluster.
*
- * The 07 consumer is loaded from kafka 07 jar using a "parent last, child first" java class loader.
- * Ordinary class loader is "parent first, child last", and kafka 08 and 07 both have classes for a lot of
- * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 07 jar will
- * will still load the 08 version class.
+ * The 0.7 consumer is loaded from kafka 0.7 jar using a "parent last, child first" java class loader.
+ * Ordinary class loader is "parent first, child last", and kafka 0.8 and 0.7 both have classes for a lot of
+ * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 0.7 jar will
+ * will still load the 0.8 version class.
*
- * As kafka 07 and kafka 08 used different version of zkClient, the zkClient jar used by kafka 07 should
+ * As kafka 0.7 and kafka 0.8 used different version of zkClient, the zkClient jar used by kafka 0.7 should
* also be used by the class loader.
*
- * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer,
- * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code.
+ * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
+ * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
*/
-public class KafkaMigrationTool
-{
+public class KafkaMigrationTool {
private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
@@ -80,90 +80,91 @@ public class KafkaMigrationTool
private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
private static Class<?> KafkaMessageClass_07 = null;
- public static void main(String[] args){
+ public static void main(String[] args) throws InterruptedException, IOException {
OptionParser parser = new OptionParser();
ArgumentAcceptingOptionSpec<String> consumerConfigOpt
- = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source cluster. " + "You man specify multiple of these.")
- .withRequiredArg()
- .describedAs("config file")
- .ofType(String.class);
+ = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
ArgumentAcceptingOptionSpec<String> producerConfigOpt
- = parser.accepts("producer.config", "Embedded producer config.")
- .withRequiredArg()
- .describedAs("config file")
- .ofType(String.class);
+ = parser.accepts("producer.config", "Producer config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
ArgumentAcceptingOptionSpec<Integer> numProducersOpt
- = parser.accepts("num.producers", "Number of producer instances")
- .withRequiredArg()
- .describedAs("Number of producers")
- .ofType(Integer.class)
- .defaultsTo(1);
+ = parser.accepts("num.producers", "Number of producer instances")
+ .withRequiredArg()
+ .describedAs("Number of producers")
+ .ofType(Integer.class)
+ .defaultsTo(1);
ArgumentAcceptingOptionSpec<String> zkClient01JarOpt
- = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
- .withRequiredArg()
- .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
- .ofType(String.class);
+ = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
+ .withRequiredArg()
+ .describedAs("zkClient 0.1 jar file required by Kafka 0.7")
+ .ofType(String.class);
ArgumentAcceptingOptionSpec<String> kafka07JarOpt
- = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
- .withRequiredArg()
- .describedAs("kafka 0.7 jar")
- .ofType(String.class);
+ = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
+ .withRequiredArg()
+ .describedAs("kafka 0.7 jar")
+ .ofType(String.class);
ArgumentAcceptingOptionSpec<Integer> numStreamsOpt
- = parser.accepts("num.streams", "Number of consumption streams.")
- .withRequiredArg()
- .describedAs("Number of threads")
- .ofType(Integer.class)
- .defaultsTo(1);
+ = parser.accepts("num.streams", "Number of consumer streams")
+ .withRequiredArg()
+ .describedAs("Number of consumer threads")
+ .ofType(Integer.class)
+ .defaultsTo(1);
ArgumentAcceptingOptionSpec<String> whitelistOpt
- = parser.accepts("whitelist", "Whitelist of topics to mirror.")
- .withRequiredArg()
- .describedAs("Java regex (String)")
- .ofType(String.class);
+ = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class);
ArgumentAcceptingOptionSpec<String> blacklistOpt
- = parser.accepts("blacklist", "Blacklist of topics to mirror.")
- .withRequiredArg()
- .describedAs("Java regex (String)")
- .ofType(String.class);
+ = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class);
OptionSpecBuilder helpOpt
- = parser.accepts("help", "Print this message.");
+ = parser.accepts("help", "Print this message.");
OptionSet options = parser.parse(args);
- try{
- if (options.has(helpOpt)){
- parser.printHelpOn(System.out);
- System.exit(0);
- }
-
- checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
- int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
- int blackListCount = options.has(blacklistOpt) ? 1 : 0;
- if(whiteListCount + blackListCount != 1){
- System.err.println("Exactly one of whitelist or blacklist is required.");
- System.exit(1);
- }
+ if (options.has(helpOpt)) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
- String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
- String zkClientJarFile = options.valueOf(zkClient01JarOpt);
- String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
- int numStreams = options.valueOf(numStreamsOpt);
- String producerConfigFile_08 = options.valueOf(producerConfigOpt);
- int numProducers = options.valueOf(numProducersOpt);
+ checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
+ int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
+ int blackListCount = options.has(blacklistOpt) ? 1 : 0;
+ if(whiteListCount + blackListCount != 1) {
+ System.err.println("Exactly one of whitelist or blacklist is required.");
+ System.exit(1);
+ }
+ String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
+ String zkClientJarFile = options.valueOf(zkClient01JarOpt);
+ String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
+ int numConsumers = options.valueOf(numStreamsOpt);
+ String producerConfigFile_08 = options.valueOf(producerConfigOpt);
+ int numProducers = options.valueOf(numProducersOpt);
+ final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
+ final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
+ try {
File kafkaJar_07 = new File(kafkaJarFile_07);
File zkClientJar = new File(zkClientJarFile);
- ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{
- kafkaJar_07.toURI().toURL(),
- zkClientJar.toURI().toURL()
+ ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] {
+ kafkaJar_07.toURI().toURL(),
+ zkClientJar.toURI().toURL()
});
/** Construct the 07 consumer config **/
@@ -182,7 +183,7 @@ public class KafkaMigrationTool
Properties kafkaConsumerProperties_07 = new Properties();
kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
/** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
- if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){
+ if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
logger.warn("Shallow iterator should not be used in the migration tool");
kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
}
@@ -190,51 +191,73 @@ public class KafkaMigrationTool
/** Construct the 07 consumer connector **/
Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
-
- Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
-
+ final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod(
- "createMessageStreamsByFilter",
- TopicFilter_07, int.class);
-
-
+ "createMessageStreamsByFilter",
+ TopicFilter_07, int.class);
+ final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown");
Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class);
Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class);
Object filterSpec = null;
-
if(options.has(whitelistOpt))
filterSpec = WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt));
else
filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt));
- Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numStreams);
+ Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers);
Properties kafkaProducerProperties_08 = new Properties();
kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
- ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
+ // create a producer channel instead
+ ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(numProducers);
+ int threadId = 0;
- List<Producer> producers = new ArrayList<Producer>();
- for (int i = 0; i < numProducers; i++){
- producers.add(new Producer(producerConfig_08));
- }
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
+ } catch(Exception e) {
+ logger.error("Error while shutting down Kafka consumer", e);
+ }
+ for(MigrationThread migrationThread : migrationThreads) {
+ migrationThread.shutdown();
+ }
+ for(ProducerThread producerThread : producerThreads) {
+ producerThread.shutdown();
+ }
+ for(ProducerThread producerThread : producerThreads) {
+ producerThread.awaitShutdown();
+ }
+ logger.info("Kafka migration tool shutdown successfully");
+ }
+ });
- int threadId = 0;
- for(Object stream: (List)retKafkaStreams){
- MigrationThread thread = new MigrationThread(stream, producers, threadId);
+ // start consumer threads
+ for(Object stream: (List)retKafkaStreams) {
+ MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
threadId ++;
thread.start();
+ migrationThreads.add(thread);
+ }
+ // start producer threads
+ for (int i = 0; i < numProducers; i++) {
+ kafkaProducerProperties_08.put("client.id", String.valueOf(i) + "-" + i);
+ ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
+ Producer producer = new Producer(producerConfig_08);
+ ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
+ producerThread.start();
+ producerThreads.add(producerThread);
}
}
catch (Throwable e){
- System.out.println("Kafka migration tool failed because of " + e);
- e.printStackTrace(System.out);
+ System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
logger.error("Kafka migration tool failed: ", e);
}
}
- private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException
- {
+ private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
for(OptionSpec arg : required) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"");
@@ -245,37 +268,53 @@ public class KafkaMigrationTool
}
- private static class MigrationThread extends Thread{
- private Object stream;
- private List<Producer> producers;
- private int threadId;
- private String threadName;
- private org.apache.log4j.Logger logger;
+ private static class ProducerDataChannel<T> {
+ private final int producerQueueSize;
+ private final BlockingQueue<T> producerRequestQueue;
+
+ public ProducerDataChannel(int queueSize) {
+ producerQueueSize = queueSize;
+ producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize);
+ }
+
+ public void sendRequest(T data) throws InterruptedException {
+ producerRequestQueue.put(data);
+ }
+
+ public T receiveRequest() throws InterruptedException {
+ return producerRequestQueue.take();
+ }
+ }
+
+ private static class MigrationThread extends Thread {
+ private final Object stream;
+ private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
+ private final int threadId;
+ private final String threadName;
+ private final org.apache.log4j.Logger logger;
+ private CountDownLatch shutdownComplete = new CountDownLatch(1);
+ private final AtomicBoolean isRunning = new AtomicBoolean(true);
- MigrationThread(Object _stream, List<Producer> _producers, int _threadId){
+ MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, int _threadId) {
stream = _stream;
- producers = _producers;
+ producerDataChannel = _producerDataChannel;
threadId = _threadId;
threadName = "MigrationThread-" + threadId;
- logger = org.apache.log4j.Logger.getLogger(threadName);
+ logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
this.setName(threadName);
}
- public void run(){
- try{
+ public void run() {
+ try {
Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload");
Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message");
Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic");
-
Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator");
Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext");
Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next");
-
Object iterator = ConsumerIteratorMethod.invoke(stream);
- Iterator<Producer> producerCircularIterator = Utils.circularIterator(JavaConversions.asBuffer(producers));
-
- while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()){
+ while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) {
Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
@@ -283,36 +322,106 @@ public class KafkaMigrationTool
int size = ((ByteBuffer)payload_07).remaining();
byte[] bytes = new byte[size];
((ByteBuffer)payload_07).get(bytes);
- logger.debug(String.format("Send kafka 08 message of size %d to topic %s", bytes.length, topic));
+ if(logger.isDebugEnabled())
+ logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
- Producer nextProducer = producerCircularIterator.next();
- nextProducer.send(producerData);
+ producerDataChannel.sendRequest(producerData);
}
- logger.info(String.format("Migration thread %s finishes running", threadName));
+ logger.info("Migration thread " + threadName + " finished running");
+ } catch (InvocationTargetException t){
+ logger.fatal("Migration thread failure due to root cause ", t.getCause());
} catch (Throwable t){
logger.fatal("Migration thread failure due to ", t);
- t.printStackTrace(System.out);
+ } finally {
+ shutdownComplete.countDown();
+ }
+ }
+
+ public void shutdown() {
+ logger.info("Migration thread " + threadName + " shutting down");
+ isRunning.set(false);
+ interrupt();
+ try {
+ shutdownComplete.await();
+ } catch(InterruptedException ie) {
+ logger.warn("Interrupt during shutdown of MigrationThread", ie);
}
+ logger.info("Migration thread " + threadName + " shutdown complete");
}
}
+ private static class ProducerThread extends Thread {
+ private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel;
+ private final Producer<String, byte[]> producer;
+ private final int threadId;
+ private String threadName;
+ private org.apache.log4j.Logger logger;
+ private CountDownLatch shutdownComplete = new CountDownLatch(1);
+ private KeyedMessage<String, byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
+
+ public ProducerThread(ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel,
+ Producer<String, byte[]> _producer,
+ int _threadId) {
+ producerDataChannel = _producerDataChannel;
+ producer = _producer;
+ threadId = _threadId;
+ threadName = "ProducerThread-" + threadId;
+ logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
+ this.setName(threadName);
+ }
+
+ public void run() {
+ try{
+ while(true) {
+ KeyedMessage<String, byte[]> data = producerDataChannel.receiveRequest();
+ if(!data.equals(shutdownMessage))
+ producer.send(data);
+ else
+ break;
+ }
+ logger.info("Producer thread " + threadName + " finished running");
+ } catch (Throwable t){
+ logger.fatal("Producer thread failure due to ", t);
+ } finally {
+ shutdownComplete.countDown();
+ }
+ }
+
+ public void shutdown() {
+ try {
+ logger.info("Producer thread " + threadName + " shutting down");
+ producerDataChannel.sendRequest(shutdownMessage);
+ } catch(InterruptedException ie) {
+ logger.warn("Interrupt during shutdown of ProducerThread", ie);
+ }
+ }
+
+ public void awaitShutdown() {
+ try {
+ shutdownComplete.await();
+ logger.info("Producer thread " + threadName + " shutdown complete");
+ } catch(InterruptedException ie) {
+ logger.warn("Interrupt during shutdown of ProducerThread", ie);
+ }
+ }
+ }
/**
- * A parent-last classloader that will try the child classloader first and then the parent.
+ * A parent-last class loader that will try the child class loader first and then the parent.
* This takes a fair bit of doing because java really prefers parent-first.
*/
- private static class ParentLastURLClassLoader extends ClassLoader{
+ private static class ParentLastURLClassLoader extends ClassLoader {
private ChildURLClassLoader childClassLoader;
/**
- * This class allows me to call findClass on a classloader
+ * This class allows me to call findClass on a class loader
*/
- private static class FindClassClassLoader extends ClassLoader{
- public FindClassClassLoader(ClassLoader parent){
+ private static class FindClassClassLoader extends ClassLoader {
+ public FindClassClassLoader(ClassLoader parent) {
super(parent);
}
@Override
- public Class<?> findClass(String name) throws ClassNotFoundException{
+ public Class<?> findClass(String name) throws ClassNotFoundException {
return super.findClass(name);
}
}
@@ -327,14 +436,15 @@ public class KafkaMigrationTool
super(urls, null);
this.realParent = realParent;
}
+
@Override
- public Class<?> findClass(String name) throws ClassNotFoundException{
+ public Class<?> findClass(String name) throws ClassNotFoundException {
try{
// first try to use the URLClassLoader findClass
return super.findClass(name);
}
- catch( ClassNotFoundException e ){
- // if that fails, we ask our real parent classloader to load the class (we give up)
+ catch( ClassNotFoundException e ) {
+ // if that fails, we ask our real parent class loader to load the class (we give up)
return realParent.loadClass(name);
}
}
@@ -347,11 +457,11 @@ public class KafkaMigrationTool
@Override
protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
- try{
- // first we try to find a class inside the child classloader
+ try {
+ // first we try to find a class inside the child class loader
return childClassLoader.findClass(name);
}
- catch( ClassNotFoundException e ){
+ catch( ClassNotFoundException e ) {
// didn't find it, try the parent
return super.loadClass(name, resolve);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 942d6c3..fe4c925 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -25,6 +25,7 @@ import java.lang.management._
import java.util.zip.CRC32
import javax.management._
import scala.collection._
+import mutable.ListBuffer
import scala.collection.mutable
import java.util.Properties
import kafka.common.KafkaException
@@ -572,5 +573,4 @@ object Utils extends Logging {
* This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
*/
def abs(n: Int) = n & 0x7fffffff
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index cce6c8e..0b6244f 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -21,9 +21,10 @@ import java.util.Arrays
import java.nio.ByteBuffer
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
-import org.junit.Test
import org.junit.Assert._
import kafka.common.KafkaException
+import org.junit.{Test}
+import kafka.tools.KafkaMigrationTool
class UtilsTest extends JUnitSuite {
@@ -53,7 +54,7 @@ class UtilsTest extends JUnitSuite {
assertEquals(2, its.next())
assertEquals(1, its.next())
}
-
+
@Test
def testReadBytes() {
for(testCase <- List("", "a", "abcd")) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 507743e..851a99e 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -246,7 +246,7 @@ object ProducerPerformance extends Logging {
while(j < messagesPerThread) {
try {
config.topics.foreach(
- topic =>{
+ topic => {
val (producerData, bytesSent_) = generateProducerData(topic, j)
bytesSent += bytesSent_
producer.send(producerData)