You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/04/07 02:04:54 UTC
svn commit: r1310645 [1/2] - in /incubator/kafka/trunk:
contrib/hadoop-consumer/src/main/java/kafka/etl/
contrib/hadoop-consumer/src/main/java/kafka/etl/impl/
contrib/hadoop-producer/src/main/java/kafka/bridge/examples/
contrib/hadoop-producer/src/main...
Author: junrao
Date: Sat Apr 7 00:04:51 2012
New Revision: 1310645
URL: http://svn.apache.org/viewvc?rev=1310645&view=rev
Log:
Separate out Kafka mirroring into a stand-alone app; patched by Joel Koshy; reviewed by Jun Rao and Neha Narkhede; KAFKA-249
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
incubator/kafka/trunk/system_test/mirror_maker/
incubator/kafka/trunk/system_test/mirror_maker/README
incubator/kafka/trunk/system_test/mirror_maker/bin/
incubator/kafka/trunk/system_test/mirror_maker/bin/expected.out
incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
incubator/kafka/trunk/system_test/mirror_maker/config/
incubator/kafka/trunk/system_test/mirror_maker/config/blacklisttest.consumer.properties
incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties
incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_1.properties
incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_2.properties
incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_1.properties
incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_2.properties
incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_1.properties
incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_2.properties
incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_1.properties
incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_2.properties
incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_target.properties
Removed:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
incubator/kafka/trunk/system_test/embedded_consumer/
Modified:
incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java
incubator/kafka/trunk/examples/src/main/java/kafka/examples/ExampleUtils.java
incubator/kafka/trunk/examples/src/main/java/kafka/examples/Producer.java
incubator/kafka/trunk/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala
incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Sat Apr 7 00:04:51 2012
@@ -16,27 +16,25 @@
*/
package kafka.etl;
+
import java.io.IOException;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.zip.CRC32;
import kafka.api.FetchRequest;
-import kafka.javaapi.MultiFetchResponse;
import kafka.api.OffsetRequest;
import kafka.common.ErrorMapping;
+import kafka.javaapi.MultiFetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
import kafka.message.MessageAndOffset;
-import kafka.message.MessageSet;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import java.nio.ByteBuffer;
@SuppressWarnings({ "deprecation"})
public class KafkaETLContext {
@@ -139,7 +137,7 @@ public class KafkaETLContext {
while ( !gotNext && _respIterator.hasNext()) {
ByteBufferMessageSet msgSet = _respIterator.next();
if ( hasError(msgSet)) return false;
- _messageIt = (Iterator<MessageAndOffset>) msgSet.iterator();
+ _messageIt = msgSet.iterator();
gotNext = get(key, value);
}
}
@@ -190,17 +188,17 @@ public class KafkaETLContext {
protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
if (_messageIt != null && _messageIt.hasNext()) {
- MessageAndOffset msgAndOffset = _messageIt.next();
+ MessageAndOffset messageAndOffset = _messageIt.next();
- ByteBuffer buf = msgAndOffset.message().payload();
+ ByteBuffer buf = messageAndOffset.message().payload();
int origSize = buf.remaining();
byte[] bytes = new byte[origSize];
- buf.get(bytes, buf.position(), origSize);
+ buf.get(bytes, buf.position(), origSize);
value.set(bytes, 0, origSize);
- key.set(_index, _offset, msgAndOffset.message().checksum());
+ key.set(_index, _offset, messageAndOffset.message().checksum());
- _offset = msgAndOffset.offset(); //increase offset
+ _offset = messageAndOffset.offset(); //increase offset
_count ++; //increase count
return true;
Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java Sat Apr 7 00:04:51 2012
@@ -16,6 +16,7 @@
*/
package kafka.etl;
+
import java.io.IOException;
import java.net.URI;
import java.util.Map;
@@ -23,13 +24,13 @@ import kafka.consumer.SimpleConsumer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
@SuppressWarnings("deprecation")
Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java Sat Apr 7 00:04:51 2012
@@ -16,13 +16,13 @@
*/
package kafka.etl;
+
import java.net.URI;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java Sat Apr 7 00:04:51 2012
@@ -16,11 +16,11 @@
*/
package kafka.etl;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
-import kafka.etl.KafkaETLKey;
public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java Sat Apr 7 00:04:51 2012
@@ -17,6 +17,7 @@
package kafka.etl;
+
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -33,7 +34,6 @@ import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Sat Apr 7 00:04:51 2012
@@ -17,32 +17,24 @@
package kafka.etl.impl;
-import java.io.IOException;
+
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
-import java.util.Random;
-import java.util.Map.Entry;
import java.util.Properties;
-
-import kafka.message.NoCompressionCodec;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-
+import java.util.Random;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
-import kafka.etl.KafkaETLUtils;
import kafka.etl.Props;
import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
import kafka.javaapi.producer.SyncProducer;
+import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
/**
* Use this class to produce test events to Kafka server. Each event contains a
Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java Sat Apr 7 00:04:51 2012
@@ -16,8 +16,9 @@
*/
package kafka.bridge.examples;
-import kafka.bridge.hadoop.KafkaOutputFormat;
+import java.io.IOException;
+import kafka.bridge.hadoop.KafkaOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
@@ -27,8 +28,6 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import java.io.IOException;
-
public class TextPublisher
{
public static void main(String[] args) throws Exception
Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Sat Apr 7 00:04:51 2012
@@ -16,24 +16,26 @@
*/
package kafka.bridge.hadoop;
-import java.util.Properties;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
-
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.log4j.Logger;
-import java.io.IOException;
-import java.net.URI;
-
public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
{
private Logger log = Logger.getLogger(KafkaOutputFormat.class);
Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Sat Apr 7 00:04:51 2012
@@ -16,19 +16,18 @@
*/
package kafka.bridge.hadoop;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
-
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
{
protected Producer<Integer, Message> producer;
Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java Sat Apr 7 00:04:51 2012
@@ -16,9 +16,12 @@
*/
package kafka.bridge.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import kafka.bridge.hadoop.KafkaOutputFormat;
import kafka.bridge.hadoop.KafkaRecordWriter;
-
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.hadoop.fs.Path;
@@ -33,10 +36,6 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
public class AvroKafkaStorage extends StoreFunc
{
protected KafkaRecordWriter writer;
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Sat Apr 7 00:04:51 2012
@@ -17,8 +17,6 @@
package kafka
-import consumer.ConsumerConfig
-import producer.ProducerConfig
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
import utils.{Utils, Logging}
import org.apache.log4j.jmx.LoggerDynamicMBean
@@ -30,8 +28,8 @@ object Kafka extends Logging {
import org.apache.log4j.Logger
Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
- if (!List(1, 3).contains(args.length)) {
- println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName()))
+ if (args.length != 1) {
+ println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
System.exit(1)
}
@@ -39,14 +37,7 @@ object Kafka extends Logging {
val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props)
- val kafkaServerStartble = args.length match {
- case 3 =>
- val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
- val producerConfig = new ProducerConfig(Utils.loadProps(args(2)))
- new KafkaServerStartable(serverConfig, consumerConfig, producerConfig)
- case 1 =>
- new KafkaServerStartable(serverConfig)
- }
+ val kafkaServerStartble = new KafkaServerStartable(serverConfig)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sat Apr 7 00:04:51 2012
@@ -36,11 +36,19 @@ object ConsoleConsumer extends Logging {
def main(args: Array[String]) {
val parser = new OptionParser
- val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.")
+ val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
- val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+ val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
+ .withRequiredArg
+ .describedAs("whitelist")
+ .ofType(classOf[String])
+ val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
+ .withRequiredArg
+ .describedAs("blacklist")
+ .ofType(classOf[String])
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
@@ -90,8 +98,20 @@ object ConsoleConsumer extends Logging {
"skip it instead of halt.")
val options: OptionSet = tryParse(parser, args)
- checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
+ Utils.checkRequiredArgs(parser, options, zkConnectOpt)
+ val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+ if (topicOrFilterOpt.size != 1) {
+ error("Exactly one of whitelist/blacklist/topic is required.")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ val topicArg = options.valueOf(topicOrFilterOpt.head)
+ val filterSpec = if (options.has(blacklistOpt))
+ new Blacklist(topicArg)
+ else
+ new Whitelist(topicArg)
+
val props = new Properties()
props.put("groupid", options.valueOf(groupIdOpt))
props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
@@ -104,7 +124,6 @@ object ConsoleConsumer extends Logging {
val config = new ConsumerConfig(props)
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
- val topic = options.valueOf(topicIdOpt)
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
@@ -123,21 +142,20 @@ object ConsoleConsumer extends Logging {
tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
}
})
-
- var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
- val iter =
- if(maxMessages >= 0)
- stream.slice(0, maxMessages)
- else
- stream
+
+ val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+ val iter = if(maxMessages >= 0)
+ stream.slice(0, maxMessages)
+ else
+ stream
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
try {
- for(message <- iter) {
+ for(messageAndTopic <- iter) {
try {
- formatter.writeTo(message, System.out)
+ formatter.writeTo(messageAndTopic.message, System.out)
} catch {
case e =>
if (skipMessageOnError)
@@ -173,16 +191,6 @@ object ConsoleConsumer extends Logging {
}
}
- def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
- for(arg <- required) {
- if(!options.has(arg)) {
- error("Missing required argument \"" + arg + "\"")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
- }
- }
-
def tryParseFormatterArgs(args: Iterable[String]): Properties = {
val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
if(!splits.forall(_.length == 2)) {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Sat Apr 7 00:04:51 2012
@@ -20,7 +20,6 @@ package kafka.consumer
import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.api.OffsetRequest
-import kafka.common.InvalidConfigException
object ConsumerConfig {
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
@@ -90,27 +89,10 @@ class ConsumerConfig(props: Properties)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
- /** Whitelist of topics for this mirror's embedded consumer to consume. At
- * most one of whitelist/blacklist may be specified. */
- val mirrorTopicsWhitelist = Utils.getString(
- props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
-
- /** Topics to skip mirroring. At most one of whitelist/blacklist may be
- * specified */
- val mirrorTopicsBlackList = Utils.getString(
- props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
-
- if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
- throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
-
- val mirrorConsumerNumThreads = Utils.getInt(
- props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
-
/** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
* Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
* overhead of decompression.
* */
val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
-
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Sat Apr 7 00:04:51 2012
@@ -29,12 +29,28 @@ trait ConsumerConnector {
* Create a list of MessageStreams for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
- * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the
- * list is #streams. Each KafkaMessageStream supports an iterator of messages.
+ * @param decoder Decoder to decode each Message to type T
+ * @return a map of (topic, list of KafkaStream) pairs.
+ * The number of items in the list is #streams. Each stream supports
+ * an iterator over message/metadata pairs.
*/
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T] = new DefaultDecoder)
- : Map[String,List[KafkaMessageStream[T]]]
+ : Map[String,List[KafkaStream[T]]]
+
+ /**
+ * Create a list of message streams for all topics that match a given filter.
+ *
+ * @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
+ * @param numStreams Number of streams to return
+ * @param decoder Decoder to decode each Message to type T
+ * @return a list of KafkaStream each of which provides an
+ * iterator over message/metadata pairs over allowed topics.
+ */
+ def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
+ numStreams: Int = 1,
+ decoder: Decoder[T] = new DefaultDecoder)
+ : Seq[KafkaStream[T]]
/**
* Commit the offsets of all broker partitions connected by this connector.
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Sat Apr 7 00:04:51 2012
@@ -19,37 +19,38 @@ package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging}
import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.message.MessageAndOffset
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
+import kafka.message.{MessageAndOffset, MessageAndMetadata}
+
/**
* An iterator that blocks until a value can be read from the supplied queue.
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
-class ConsumerIterator[T](private val topic: String,
- private val channel: BlockingQueue[FetchedDataChunk],
+class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T],
val enableShallowIterator: Boolean)
- extends IteratorTemplate[T] with Logging {
+ extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo:PartitionTopicInfo = null
private var consumedOffset: Long = -1L
- override def next(): T = {
- val decodedMessage = super.next()
+ override def next(): MessageAndMetadata[T] = {
+ val item = super.next()
if(consumedOffset < 0)
throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
- trace("Setting consumed offset to %d".format(consumedOffset))
+ val topic = currentTopicInfo.topic
+ trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
- decodedMessage
+ item
}
- protected def makeNext(): T = {
+ protected def makeNext(): MessageAndMetadata[T] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
@@ -82,10 +83,11 @@ class ConsumerIterator[T](private val to
}
val item = localCurrent.next()
consumedOffset = item.offset
- decoder.toEvent(item.message)
+
+ new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
}
- def clearCurrentChunk() = {
+ def clearCurrentChunk() {
try {
info("Clearing the current data chunk for this consumer iterator")
current.set(null)
@@ -94,3 +96,4 @@ class ConsumerIterator[T](private val to
}
class ConsumerTimeoutException() extends RuntimeException()
+
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Sat Apr 7 00:04:51 2012
@@ -42,24 +42,24 @@ private [consumer] class Fetcher(val con
fetcherThreads = EMPTY_FETCHER_THREADS
}
- def clearFetcherQueues[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+ def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+ messageStreams: Map[String,List[KafkaStream[_]]]) {
// Clear all but the currently iterated upon chunk in the consumer thread's queue
queuesTobeCleared.foreach(_.clear)
info("Cleared all relevant queues for this fetcher")
// Also clear the currently iterated upon chunk in the consumer threads
- if(kafkaMessageStreams != null)
- kafkaMessageStreams.foreach(_._2.foreach(s => s.clear()))
+ if(messageStreams != null)
+ messageStreams.foreach(_._2.foreach(s => s.clear()))
info("Cleared the data chunks in all the consumer message iterators")
}
- def startConnections[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+ def startConnections(topicInfos: Iterable[PartitionTopicInfo],
+ cluster: Cluster) {
if (topicInfos == null)
return
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala Sat Apr 7 00:04:51 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+
+import java.util.concurrent.BlockingQueue
+import kafka.serializer.Decoder
+import kafka.message.MessageAndMetadata
+
+class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
+ consumerTimeoutMs: Int,
+ private val decoder: Decoder[T],
+ val enableShallowIterator: Boolean)
+ extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
+
+ private val iter: ConsumerIterator[T] =
+ new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator)
+
+ /**
+ * Create an iterator over messages in the stream.
+ */
+ def iterator(): ConsumerIterator[T] = iter
+
+ /**
+ * This method clears the queue being iterated during the consumer rebalancing. This is mainly
+ * to reduce the number of duplicates received by the consumer
+ */
+ def clear() {
+ iter.clearCurrentChunk()
+ }
+
+}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Sat Apr 7 00:04:51 2012
@@ -19,35 +19,18 @@ package kafka.consumer
import scala.collection._
import scala.util.parsing.json.JSON
-import kafka.utils.Logging
+import org.I0Itec.zkclient.ZkClient
+import java.util.regex.Pattern
+import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
-private[kafka] object TopicCount extends Logging {
- val myConversionFunc = {input : String => input.toInt}
- JSON.globalNumberParser = myConversionFunc
-
- def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = {
- var topMap : Map[String,Int] = null
- try {
- JSON.parseFull(jsonString) match {
- case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
- case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
- }
- }
- catch {
- case e =>
- error("error parsing consumer json string " + jsonString, e)
- throw e
- }
-
- new TopicCount(consumerIdSting, topMap)
- }
-
-}
-private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
-
- def getConsumerThreadIdsPerTopic()
- : Map[String, Set[String]] = {
+private[kafka] trait TopicCount {
+ def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
+
+ def dbString: String
+
+ protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
+ topicCountMap: Map[String, Int]) = {
val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
for ((topic, nConsumers) <- topicCountMap) {
val consumerSet = new mutable.HashSet[String]
@@ -58,11 +41,96 @@ private[kafka] class TopicCount(val cons
}
consumerThreadIdsPerTopicMap
}
+}
+
+private[kafka] object TopicCount extends Logging {
+
+ /*
+ * Example of whitelist topic count stored in ZooKeeper:
+ * Topics with whitetopic as prefix, and four streams: *4*whitetopic.*
+ *
+ * Example of blacklist topic count stored in ZooKeeper:
+ * Topics with blacktopic as prefix, and four streams: !4!blacktopic.*
+ */
+
+ val WHITELIST_MARKER = "*"
+ val BLACKLIST_MARKER = "!"
+ private val WHITELIST_PATTERN =
+ Pattern.compile("""\*(\p{Digit}+)\*(.*)""")
+ private val BLACKLIST_PATTERN =
+ Pattern.compile("""!(\p{Digit}+)!(.*)""")
+
+ val myConversionFunc = {input : String => input.toInt}
+ JSON.globalNumberParser = myConversionFunc
+
+ def constructTopicCount(group: String,
+ consumerId: String,
+ zkClient: ZkClient) : TopicCount = {
+ val dirs = new ZKGroupDirs(group)
+ val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+ val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
+ val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
+
+ if (hasWhitelist || hasBlacklist)
+ info("Constructing topic count for %s from %s using %s as pattern."
+ .format(consumerId, topicCountString,
+ if (hasWhitelist) WHITELIST_PATTERN else BLACKLIST_PATTERN))
+
+ if (hasWhitelist || hasBlacklist) {
+ val matcher = if (hasWhitelist)
+ WHITELIST_PATTERN.matcher(topicCountString)
+ else
+ BLACKLIST_PATTERN.matcher(topicCountString)
+ require(matcher.matches())
+ val numStreams = matcher.group(1).toInt
+ val regex = matcher.group(2)
+ val filter = if (hasWhitelist)
+ new Whitelist(regex)
+ else
+ new Blacklist(regex)
+
+ new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
+ }
+ else {
+ var topMap : Map[String,Int] = null
+ try {
+ JSON.parseFull(topicCountString) match {
+ case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
+ case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
+ }
+ }
+ catch {
+ case e =>
+ error("error parsing consumer json string " + topicCountString, e)
+ throw e
+ }
+
+ new StaticTopicCount(consumerId, topMap)
+ }
+ }
+
+ def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
+ new StaticTopicCount(consumerIdString, topicCount)
+
+ def constructTopicCount(consumerIdString: String,
+ filter: TopicFilter,
+ numStreams: Int,
+ zkClient: ZkClient) =
+ new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
+
+}
+
+private[kafka] class StaticTopicCount(val consumerIdString: String,
+ val topicCountMap: Map[String, Int])
+ extends TopicCount {
+
+ def getConsumerThreadIdsPerTopic =
+ makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
override def equals(obj: Any): Boolean = {
obj match {
case null => false
- case n: TopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
+ case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
case _ => false
}
}
@@ -73,7 +141,7 @@ private[kafka] class TopicCount(val cons
* "topic2" : 4
* }
*/
- def toJsonString() : String = {
+ def dbString = {
val builder = new StringBuilder
builder.append("{ ")
var i = 0
@@ -84,6 +152,29 @@ private[kafka] class TopicCount(val cons
i += 1
}
builder.append(" }")
- builder.toString
+ builder.toString()
}
}
+
+private[kafka] class WildcardTopicCount(zkClient: ZkClient,
+ consumerIdString: String,
+ topicFilter: TopicFilter,
+ numStreams: Int) extends TopicCount {
+ def getConsumerThreadIdsPerTopic = {
+ val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(
+ zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
+ makeConsumerThreadIdsPerTopic(consumerIdString,
+ Map(wildcardTopics.map((_, numStreams)): _*))
+ }
+
+ def dbString = {
+ val marker = topicFilter match {
+ case wl: Whitelist => TopicCount.WHITELIST_MARKER
+ case bl: Blacklist => TopicCount.BLACKLIST_MARKER
+ }
+
+ "%s%d%s%s".format(marker, numStreams, marker, topicFilter.regex)
+ }
+
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala Sat Apr 7 00:04:51 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+
+import kafka.utils.Logging
+import java.util.regex.{PatternSyntaxException, Pattern}
+
+
+sealed abstract class TopicFilter(rawRegex: String) extends Logging {
+
+ val regex = rawRegex
+ .trim
+ .replace(',', '|')
+ .replace(" ", "")
+ .replaceAll("""^["']+""","")
+ .replaceAll("""["']+$""","") // property files may bring quotes
+
+ try {
+ Pattern.compile(regex)
+ }
+ catch {
+ case e: PatternSyntaxException =>
+ throw new RuntimeException(regex + " is an invalid regex.")
+ }
+
+ override def toString = regex
+
+ def requiresTopicEventWatcher: Boolean
+
+ def isTopicAllowed(topic: String): Boolean
+}
+
+case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
+ override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""")
+
+ override def isTopicAllowed(topic: String) = {
+ val allowed = topic.matches(regex)
+
+ debug("%s %s".format(
+ topic, if (allowed) "allowed" else "filtered"))
+
+ allowed
+ }
+
+
+}
+
+case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
+ override def requiresTopicEventWatcher = true
+
+ override def isTopicAllowed(topic: String) = {
+ val allowed = !topic.matches(regex)
+
+ debug("%s %s".format(
+ topic, if (allowed) "allowed" else "filtered"))
+
+ allowed
+ }
+}
+
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Sat Apr 7 00:04:51 2012
@@ -34,6 +34,7 @@ import kafka.common.{ConsumerRebalanceFa
import java.lang.IllegalStateException
import kafka.utils.ZkUtils._
+
/**
* This class handles the consumers interaction with zookeeper
*
@@ -86,16 +87,37 @@ trait ZookeeperConsumerConnectorMBean {
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
- extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging {
-
+ extends ConsumerConnector with ZookeeperConsumerConnectorMBean
+ with Logging {
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[Fetcher] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
- // queues : (topic,consumerThreadId) -> queue
- private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
+ // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
+ private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+ private val messageStreamCreated = new AtomicBoolean(false)
+
+ private var sessionExpirationListener: ZKSessionExpireListener = null
+ private var loadBalancerListener: ZKRebalancerListener = null
+
+ private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+
+ val consumerIdString = {
+ var consumerUuid : String = null
+ config.consumerId match {
+ case Some(consumerId) // for testing only
+ => consumerUuid = consumerId
+ case None // generate unique consumerId automatically
+ => val uuid = UUID.randomUUID()
+ consumerUuid = "%s-%d-%s".format(
+ InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
+ uuid.getMostSignificantBits().toHexString.substring(0,8))
+ }
+ config.groupId + "_" + consumerUuid
+ }
+ this.logIdent = consumerIdString + " "
connectZk()
createFetcher()
@@ -108,10 +130,18 @@ private[kafka] class ZookeeperConsumerCo
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T])
- : Map[String,List[KafkaMessageStream[T]]] = {
+ : Map[String,List[KafkaStream[T]]] = {
+ if (messageStreamCreated.getAndSet(true))
+ throw new RuntimeException(this.getClass.getSimpleName +
+ " can create message streams at most once")
consume(topicCountMap, decoder)
}
+ def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
+ val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
+ wildcardStreamsHandler.streams
+ }
+
private def createFetcher() {
if (enableFetcher)
fetcher = Some(new Fetcher(config, zkClient))
@@ -126,6 +156,9 @@ private[kafka] class ZookeeperConsumerCo
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("ZKConsumerConnector shutting down")
+
+ if (wildcardTopicWatcher != null)
+ wildcardTopicWatcher.shutdown()
try {
scheduler.shutdownNow()
fetcher match {
@@ -150,69 +183,42 @@ private[kafka] class ZookeeperConsumerCo
def consume[T](topicCountMap: scala.collection.Map[String,Int],
decoder: Decoder[T])
- : Map[String,List[KafkaMessageStream[T]]] = {
+ : Map[String,List[KafkaStream[T]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
- val dirs = new ZKGroupDirs(config.groupId)
- var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]]
+ val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
- var consumerUuid : String = null
- config.consumerId match {
- case Some(consumerId) // for testing only
- => consumerUuid = consumerId
- case None // generate unique consumerId automatically
- => val uuid = UUID.randomUUID()
- consumerUuid = "%s-%d-%s".format(
- InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
- uuid.getMostSignificantBits().toHexString.substring(0,8))
- }
- val consumerIdString = config.groupId + "_" + consumerUuid
- val topicCount = new TopicCount(consumerIdString, topicCountMap)
-
- // create a queue per topic per consumer thread
- val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
- for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
- var streamList: List[KafkaMessageStream[T]] = Nil
- for (threadId <- threadIdSet) {
- val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
- queues.put((topic, threadId), stream)
- streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
- }
- ret += (topic -> streamList)
- debug("adding topic " + topic + " and stream to map..")
- }
-
- // listener to consumer and partition changes
- val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString, ret)
- registerConsumerInZK(dirs, consumerIdString, topicCount)
+ val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
- // register listener for session expired event
- zkClient.subscribeStateChanges(
- new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener))
-
- zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+ // make a list of (queue,stream) pairs, one pair for each threadId
+ val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
+ threadIdSet.map(_ => {
+ val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+ val stream = new KafkaStream[T](
+ queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+ (queue, stream)
+ })
+ ).flatten.toList
- ret.foreach { topicAndStreams =>
- // register on broker partition path changes
- val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
- zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
- }
+ val dirs = new ZKGroupDirs(config.groupId)
+ registerConsumerInZK(dirs, consumerIdString, topicCount)
+ reinitializeConsumer(topicCount, queuesAndStreams)
- // explicitly trigger load balancing for this consumer
- loadBalancerListener.syncedRebalance()
- ret
+ loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
}
- private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+ private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
info("begin registering consumer " + consumerIdString + " in ZK")
- createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+ createEphemeralPathExpectConflict(zkClient,
+ dirs.consumerRegistryDir + "/" + consumerIdString,
+ topicCount.dbString)
info("end registering consumer " + consumerIdString + " in ZK")
}
private def sendShutdownToAllQueues() = {
- for (queue <- queues.values) {
+ for (queue <- topicThreadIdAndQueues.values) {
debug("Clearing up queue")
queue.clear()
queue.put(ZookeeperConsumerConnector.shutdownCommand)
@@ -334,10 +340,10 @@ private[kafka] class ZookeeperConsumerCo
producedOffset
}
- class ZKSessionExpireListener[T](val dirs: ZKGroupDirs,
+ class ZKSessionExpireListener(val dirs: ZKGroupDirs,
val consumerIdString: String,
val topicCount: TopicCount,
- val loadBalancerListener: ZKRebalancerListener[T])
+ val loadBalancerListener: ZKRebalancerListener)
extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
@@ -359,10 +365,10 @@ private[kafka] class ZookeeperConsumerCo
* consumer in the consumer registry and trigger a rebalance.
*/
info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
- loadBalancerListener.resetState
+ loadBalancerListener.resetState()
registerConsumerInZK(dirs, consumerIdString, topicCount)
// explicitly trigger load balancing for this consumer
- loadBalancerListener.syncedRebalance
+ loadBalancerListener.syncedRebalance()
// There is no need to resubscribe to child and state changes.
// The child change watchers will be set inside rebalance when we read the children list.
@@ -370,8 +376,8 @@ private[kafka] class ZookeeperConsumerCo
}
- class ZKRebalancerListener[T](val group: String, val consumerIdString: String,
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
+ class ZKRebalancerListener(val group: String, val consumerIdString: String,
+ val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
extends IZkChildListener {
private var isWatcherTriggered = false
private val lock = new ReentrantLock
@@ -459,7 +465,7 @@ private[kafka] class ZookeeperConsumerCo
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
}
// stop all fetchers and clear all the queues to avoid data duplication
- closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
+ closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
Thread.sleep(config.rebalanceBackoffMs)
}
}
@@ -468,7 +474,7 @@ private[kafka] class ZookeeperConsumerCo
}
private def rebalance(cluster: Cluster): Boolean = {
- val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+ val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
@@ -478,7 +484,7 @@ private[kafka] class ZookeeperConsumerCo
* But if we don't stop the fetchers first, this consumer would continue returning data for released
* partitions in parallel. So, not stopping the fetchers leads to duplicate data.
*/
- closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap)
+ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
releasePartitionOwnership(topicRegistry)
@@ -531,7 +537,7 @@ private[kafka] class ZookeeperConsumerCo
debug("Partitions per topic cache " + partitionsPerTopicMap)
debug("Consumers per topic cache " + consumersPerTopicMap)
topicRegistry = currentTopicRegistry
- updateFetcher(cluster, kafkaMessageStreams)
+ updateFetcher(cluster)
true
}else {
false
@@ -539,12 +545,12 @@ private[kafka] class ZookeeperConsumerCo
}
private def closeFetchersForQueues(cluster: Cluster,
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+ messageStreams: Map[String,List[KafkaStream[_]]],
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
case Some(f) => f.stopConnectionsToAllBrokers
- f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
+ f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
info("Committing all offsets after clearing the fetcher queues")
/**
* here, we need to commit offsets before stopping the consumer from returning any more messages
@@ -559,16 +565,15 @@ private[kafka] class ZookeeperConsumerCo
}
}
- private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+ private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
relevantTopicThreadIdsMap: Map[String, Set[String]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
- val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
- closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+ val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+ closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
}
- private def updateFetcher[T](cluster: Cluster,
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+ private def updateFetcher(cluster: Cluster) {
// update partitions for fetcher
var allPartitionInfos : List[PartitionTopicInfo] = Nil
for (partitionInfos <- topicRegistry.values)
@@ -579,7 +584,7 @@ private[kafka] class ZookeeperConsumerCo
fetcher match {
case Some(f) =>
- f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams)
+ f.startConnections(allPartitionInfos, cluster)
case None =>
}
}
@@ -637,7 +642,7 @@ private[kafka] class ZookeeperConsumerCo
}
else
offset = offsetString.toLong
- val queue = queues.get((topic, consumerThreadId))
+ val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
@@ -651,5 +656,155 @@ private[kafka] class ZookeeperConsumerCo
debug(partTopicInfo + " selected new offset " + offset)
}
}
+
+ private def reinitializeConsumer[T](
+ topicCount: TopicCount,
+ queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
+
+ val dirs = new ZKGroupDirs(config.groupId)
+
+ // listener to consumer and partition changes
+ if (loadBalancerListener == null) {
+ val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
+ loadBalancerListener = new ZKRebalancerListener(
+ config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
+ }
+
+ // register listener for session expired event
+ if (sessionExpirationListener == null)
+ sessionExpirationListener = new ZKSessionExpireListener(
+ dirs, consumerIdString, topicCount, loadBalancerListener)
+
+ val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
+
+ // map of {topic -> Set(thread-1, thread-2, ...)}
+ val consumerThreadIdsPerTopic: Map[String, Set[String]] =
+ topicCount.getConsumerThreadIdsPerTopic
+
+ /*
+ * This usage of map flatten breaks up consumerThreadIdsPerTopic into
+ * a set of (topic, thread-id) pairs that we then use to construct
+ * the updated (topic, thread-id) -> queues map
+ */
+ implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _))
+
+ // iterator over (topic, thread-id) tuples
+ val topicThreadIds: Iterable[(String, String)] =
+ consumerThreadIdsPerTopic.flatten
+
+ // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
+ val threadQueueStreamPairs = topicCount match {
+ case wildTopicCount: WildcardTopicCount =>
+ for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
+ case statTopicCount: StaticTopicCount => {
+ require(topicThreadIds.size == queuesAndStreams.size,
+ "Mismatch between thread ID count (%d) and queue count (%d)".format(
+ topicThreadIds.size, queuesAndStreams.size))
+ topicThreadIds.zip(queuesAndStreams)
+ }
+ }
+
+ threadQueueStreamPairs.foreach(e => {
+ val topicThreadId = e._1
+ val q = e._2._1
+ topicThreadIdAndQueues.put(topicThreadId, q)
+ })
+
+ val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
+ groupedByTopic.foreach(e => {
+ val topic = e._1
+ val streams = e._2.map(_._2._2).toList
+ topicStreamsMap += (topic -> streams)
+ debug("adding topic %s and %d streams to map.".format(topic, streams.size))
+ })
+
+ // listener to consumer and partition changes
+ zkClient.subscribeStateChanges(sessionExpirationListener)
+
+ zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+
+ topicStreamsMap.foreach { topicAndStreams =>
+ // register on broker partition path changes
+ val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
+ zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+ }
+
+ // explicitly trigger load balancing for this consumer
+ loadBalancerListener.syncedRebalance()
+ }
+
+ class WildcardStreamsHandler[T](topicFilter: TopicFilter,
+ numStreams: Int,
+ decoder: Decoder[T])
+ extends TopicEventHandler[String] {
+
+ if (messageStreamCreated.getAndSet(true))
+ throw new RuntimeException("Each consumer connector can create " +
+ "message streams by filter at most once.")
+
+ private val wildcardQueuesAndStreams = (1 to numStreams)
+ .map(e => {
+ val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+ val stream = new KafkaStream[T](
+ queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+ (queue, stream)
+ }).toList
+
+ // bootstrap with existing topics
+ private var wildcardTopics =
+ getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+ .filter(topicFilter.isTopicAllowed)
+
+ private val wildcardTopicCount = TopicCount.constructTopicCount(
+ consumerIdString, topicFilter, numStreams, zkClient)
+
+ val dirs = new ZKGroupDirs(config.groupId)
+ registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
+ reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+
+ if (!topicFilter.requiresTopicEventWatcher) {
+ info("Not creating event watcher for trivial whitelist " + topicFilter)
+ }
+ else {
+ info("Creating topic event watcher for whitelist " + topicFilter)
+ wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
+
+ /*
+ * Topic events will trigger subsequent synced rebalances. Also, the
+ * consumer will get registered only after an allowed topic becomes
+ * available.
+ */
+ }
+
+ def handleTopicEvent(allTopics: Seq[String]) {
+ debug("Handling topic event")
+
+ val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
+
+ val addedTopics = updatedTopics filterNot (wildcardTopics contains)
+ if (addedTopics.nonEmpty)
+ info("Topic event: added topics = %s"
+ .format(addedTopics))
+
+ /*
+ * TODO: Deleted topics are interesting (and will not be a concern until
+ * 0.8 release). We may need to remove these topics from the rebalance
+ * listener's map in reinitializeConsumer.
+ */
+ val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
+ if (deletedTopics.nonEmpty)
+ info("Topic event: deleted topics = %s"
+ .format(deletedTopics))
+
+ wildcardTopics = updatedTopics
+ info("Topics to consume = %s".format(wildcardTopics))
+
+ if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
+ reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+ }
+
+ def streams: Seq[KafkaStream[T]] =
+ wildcardQueuesAndStreams.map(_._2)
+ }
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Sat Apr 7 00:04:51 2012
@@ -21,11 +21,9 @@ import scala.collection.JavaConversions.
import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.server.KafkaServerStartable
-import kafka.common.ConsumerRebalanceFailedException
class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
- val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging {
+ val eventHandler: TopicEventHandler[String]) extends Logging {
val lock = new Object()
@@ -35,7 +33,7 @@ class ZookeeperTopicEventWatcher(val con
startWatchingTopicEvents()
private def startWatchingTopicEvents() {
- val topicEventListener = new ZkTopicEventListener(kafkaServerStartable)
+ val topicEventListener = new ZkTopicEventListener()
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
zkClient.subscribeStateChanges(
@@ -52,6 +50,7 @@ class ZookeeperTopicEventWatcher(val con
def shutdown() {
lock.synchronized {
+ info("Shutting down topic event watcher.")
if (zkClient != null) {
stopWatchingTopicEvents()
zkClient.close()
@@ -62,7 +61,7 @@ class ZookeeperTopicEventWatcher(val con
}
}
- class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener {
+ class ZkTopicEventListener extends IZkChildListener {
@throws(classOf[Exception])
def handleChildChange(parent: String, children: java.util.List[String]) {
@@ -76,11 +75,8 @@ class ZookeeperTopicEventWatcher(val con
}
}
catch {
- case e: ConsumerRebalanceFailedException =>
- fatal("can't rebalance in embedded consumer; proceed to shutdown", e)
- kafkaServerStartable.shutdown()
case e =>
- error("error in handling child changes in embedded consumer", e)
+ error("error in handling child changes", e)
}
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Sat Apr 7 00:04:51 2012
@@ -17,34 +17,53 @@
package kafka.javaapi.consumer;
-import kafka.consumer.KafkaMessageStream;
-import kafka.message.Message;
-import kafka.serializer.Decoder;
import java.util.List;
import java.util.Map;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.TopicFilter;
+import kafka.message.Message;
+import kafka.serializer.Decoder;
public interface ConsumerConnector {
- /**
- * Create a list of MessageStreams of type T for each topic.
- *
- * @param topicCountMap a map of (topic, #streams) pair
- * @param decoder a decoder that converts from Message to T
- * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the
- * list is #streams. Each KafkaMessageStream supports an iterator of messages.
- */
- public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
- Map<String, Integer> topicCountMap, Decoder<T> decoder);
- public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
- Map<String, Integer> topicCountMap);
-
- /**
- * Commit the offsets of all broker partitions connected by this connector.
- */
- public void commitOffsets();
-
- /**
- * Shut down the connector
- */
- public void shutdown();
+ /**
+ * Create a list of MessageStreams of type T for each topic.
+ *
+ * @param topicCountMap a map of (topic, #streams) pair
+ * @param decoder a decoder that converts from Message to T
+ * @return a map of (topic, list of KafkaStream) pairs.
+ * The number of items in the list is #streams. Each stream supports
+ * an iterator over message/metadata pairs.
+ */
+ public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
+ Map<String, Integer> topicCountMap, Decoder<T> decoder);
+ public Map<String, List<KafkaStream<Message>>> createMessageStreams(
+ Map<String, Integer> topicCountMap);
+
+ /**
+ * Create a list of MessageAndTopicStreams containing messages of type T.
+ *
+ * @param topicFilter a TopicFilter that specifies which topics to
+ * subscribe to (encapsulates a whitelist or a blacklist).
+ * @param numStreams the number of message streams to return.
+ * @param decoder a decoder that converts from Message to T
+ * @return a list of KafkaStream. Each stream supports an
+ * iterator over its MessageAndMetadata elements.
+ */
+ public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
+ TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
+ public List<KafkaStream<Message>> createMessageStreamsByFilter(
+ TopicFilter topicFilter, int numStreams);
+ public List<KafkaStream<Message>> createMessageStreamsByFilter(
+ TopicFilter topicFilter);
+
+ /**
+ * Commit the offsets of all broker partitions connected by this connector.
+ */
+ public void commitOffsets();
+
+ /**
+ * Shut down the connector
+ */
+ public void shutdown();
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Sat Apr 7 00:04:51 2012
@@ -16,9 +16,11 @@
*/
package kafka.javaapi.consumer
-import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
import kafka.message.Message
import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.consumer._
+import scala.collection.JavaConversions.asList
+
/**
* This class handles the consumers interaction with zookeeper
@@ -68,14 +70,14 @@ private[kafka] class ZookeeperConsumerCo
def createMessageStreams[T](
topicCountMap: java.util.Map[String,java.lang.Integer],
decoder: Decoder[T])
- : java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = {
+ : java.util.Map[String,java.util.List[KafkaStream[T]]] = {
import scala.collection.JavaConversions._
val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
- val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]]
+ val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
for ((topic, streams) <- scalaReturn) {
- var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]]
+ var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
for (stream <- streams)
javaStreamList.add(stream)
ret.put(topic, javaStreamList)
@@ -85,9 +87,17 @@ private[kafka] class ZookeeperConsumerCo
def createMessageStreams(
topicCountMap: java.util.Map[String,java.lang.Integer])
- : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] =
+ : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
createMessageStreams(topicCountMap, new DefaultDecoder)
+ def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
+ asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
+
+ def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
+ createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
+
+ def createMessageStreamsByFilter(topicFilter: TopicFilter) =
+ createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
def commitOffsets() {
underlying.commitOffsets
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Sat Apr 7 00:04:51 2012
@@ -17,8 +17,10 @@
package kafka.javaapi.message
+
import kafka.message.{MessageAndOffset, InvalidMessageException}
+
/**
* A set of messages. A message set has a fixed serialized form, though the container
* for the bytes could be either in-memory or on disk. A The format of each message is
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java Sat Apr 7 00:04:51 2012
@@ -16,9 +16,9 @@
*/
package kafka.javaapi.producer.async;
-import kafka.producer.async.QueueItem;
import java.util.Properties;
+import kafka.producer.async.QueueItem;
/**
* Callback handler APIs for use in the async producer. The purpose is to
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java Sat Apr 7 00:04:51 2012
@@ -16,12 +16,12 @@
*/
package kafka.javaapi.producer.async;
-import kafka.javaapi.producer.SyncProducer;
-import kafka.producer.async.QueueItem;
-import kafka.serializer.Encoder;
import java.util.List;
import java.util.Properties;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.async.QueueItem;
+import kafka.serializer.Encoder;
/**
* Handler that dispatches the batched data from the queue of the
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Sat Apr 7 00:04:51 2012
@@ -36,7 +36,6 @@ import kafka.common.{MessageSizeTooLarge
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
- private var validByteCount = -1L
private var shallowValidByteCount = -1L
if(sizeInBytes > Int.MaxValue)
throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala Sat Apr 7 00:04:51 2012
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+case class MessageAndMetadata[T](message: T, topic: String = "")
+
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala Sat Apr 7 00:04:51 2012
@@ -13,11 +13,10 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package kafka.message
-/**
- * Represents message and offset of the next message. This is used in the MessageSet to iterate over it
- */
-case class MessageAndOffset(val message: Message, val offset: Long)
+
+case class MessageAndOffset(message: Message, offset: Long)
+
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Sat Apr 7 00:04:51 2012
@@ -22,9 +22,7 @@ import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
import kafka.utils.Logging
-import kafka.serializer.Encoder
import java.util.{Properties, Date}
-import kafka.message.Message
import scala.collection._
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Sat Apr 7 00:04:51 2012
@@ -22,7 +22,7 @@ import kafka.utils._
import java.util.Properties
import kafka.cluster.{Partition, Broker}
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException}
import kafka.api.ProducerRequest
class Producer[K,V](config: ProducerConfig,