You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/04/07 02:04:54 UTC

svn commit: r1310645 [1/2] - in /incubator/kafka/trunk: contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/src/main/java/kafka/bridge/examples/ contrib/hadoop-producer/src/main...

Author: junrao
Date: Sat Apr  7 00:04:51 2012
New Revision: 1310645

URL: http://svn.apache.org/viewvc?rev=1310645&view=rev
Log:
Separate out Kafka mirroring into a stand-alone app; patched by Joel Koshy; reviewed by Jun Rao and Neha Narkhede; KAFKA-249

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
    incubator/kafka/trunk/system_test/mirror_maker/
    incubator/kafka/trunk/system_test/mirror_maker/README
    incubator/kafka/trunk/system_test/mirror_maker/bin/
    incubator/kafka/trunk/system_test/mirror_maker/bin/expected.out
    incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
    incubator/kafka/trunk/system_test/mirror_maker/config/
    incubator/kafka/trunk/system_test/mirror_maker/config/blacklisttest.consumer.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_1.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_2.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_1.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_2.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_1.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_2.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_1.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_2.properties
    incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_target.properties
Removed:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
    incubator/kafka/trunk/system_test/embedded_consumer/
Modified:
    incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
    incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
    incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
    incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
    incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
    incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
    incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
    incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
    incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
    incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
    incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java
    incubator/kafka/trunk/examples/src/main/java/kafka/examples/ExampleUtils.java
    incubator/kafka/trunk/examples/src/main/java/kafka/examples/Producer.java
    incubator/kafka/trunk/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala

Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Sat Apr  7 00:04:51 2012
@@ -16,27 +16,25 @@
  */
 package kafka.etl;
 
+
 import java.io.IOException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.zip.CRC32;
 import kafka.api.FetchRequest;
-import kafka.javaapi.MultiFetchResponse;
 import kafka.api.OffsetRequest;
 import kafka.common.ErrorMapping;
+import kafka.javaapi.MultiFetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
 import kafka.message.MessageAndOffset;
-import kafka.message.MessageSet;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import java.nio.ByteBuffer;
 
 @SuppressWarnings({ "deprecation"})
 public class KafkaETLContext {
@@ -139,7 +137,7 @@ public class KafkaETLContext {
             while ( !gotNext && _respIterator.hasNext()) {
                 ByteBufferMessageSet msgSet = _respIterator.next();
                 if ( hasError(msgSet)) return false;
-                _messageIt =  (Iterator<MessageAndOffset>) msgSet.iterator();
+                _messageIt = msgSet.iterator();
                 gotNext = get(key, value);
             }
         }
@@ -190,17 +188,17 @@ public class KafkaETLContext {
     
     protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
         if (_messageIt != null && _messageIt.hasNext()) {
-            MessageAndOffset msgAndOffset = _messageIt.next();
+            MessageAndOffset messageAndOffset = _messageIt.next();
             
-            ByteBuffer buf = msgAndOffset.message().payload();
+            ByteBuffer buf = messageAndOffset.message().payload();
             int origSize = buf.remaining();
             byte[] bytes = new byte[origSize];
-            buf.get(bytes, buf.position(), origSize);
+          buf.get(bytes, buf.position(), origSize);
             value.set(bytes, 0, origSize);
             
-            key.set(_index, _offset, msgAndOffset.message().checksum());
+            key.set(_index, _offset, messageAndOffset.message().checksum());
             
-            _offset = msgAndOffset.offset();  //increase offset
+            _offset = messageAndOffset.offset();  //increase offset
             _count ++;  //increase count
             
             return true;

Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java Sat Apr  7 00:04:51 2012
@@ -16,6 +16,7 @@
  */
 package kafka.etl;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
@@ -23,13 +24,13 @@ import kafka.consumer.SimpleConsumer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
 
 
 @SuppressWarnings("deprecation")

Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java Sat Apr  7 00:04:51 2012
@@ -16,13 +16,13 @@
  */
 package kafka.etl;
 
+
 import java.net.URI;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;

Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java Sat Apr  7 00:04:51 2012
@@ -16,11 +16,11 @@
  */
 package kafka.etl;
 
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import org.apache.hadoop.io.WritableComparable;
-import kafka.etl.KafkaETLKey;
 
 public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
 

Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java Sat Apr  7 00:04:51 2012
@@ -17,6 +17,7 @@
 
 package kafka.etl;
 
+
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -33,7 +34,6 @@ import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

Modified: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Sat Apr  7 00:04:51 2012
@@ -17,32 +17,24 @@
 
 package kafka.etl.impl;
 
-import java.io.IOException;
+
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
-import java.util.Map.Entry;
 import java.util.Properties;
-
-import kafka.message.NoCompressionCodec;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-
+import java.util.Random;
 import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLRequest;
-import kafka.etl.KafkaETLUtils;
 import kafka.etl.Props;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
 import kafka.javaapi.producer.SyncProducer;
+import kafka.message.Message;
 import kafka.producer.SyncProducerConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Use this class to produce test events to Kafka server. Each event contains a

Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java Sat Apr  7 00:04:51 2012
@@ -16,8 +16,9 @@
  */
 package kafka.bridge.examples;
 
-import kafka.bridge.hadoop.KafkaOutputFormat;
 
+import java.io.IOException;
+import kafka.bridge.hadoop.KafkaOutputFormat;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -27,8 +28,6 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
-import java.io.IOException;
-
 public class TextPublisher
 {
   public static void main(String[] args) throws Exception

Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Sat Apr  7 00:04:51 2012
@@ -16,24 +16,26 @@
  */
 package kafka.bridge.hadoop;
 
-import java.util.Properties;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.Properties;
 import kafka.javaapi.producer.Producer;
 import kafka.message.Message;
 import kafka.producer.ProducerConfig;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.net.URI;
-
 public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
 {
   private Logger log = Logger.getLogger(KafkaOutputFormat.class);

Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Sat Apr  7 00:04:51 2012
@@ -16,19 +16,18 @@
  */
 package kafka.bridge.hadoop;
 
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import kafka.javaapi.producer.Producer;
 import kafka.javaapi.producer.ProducerData;
 import kafka.message.Message;
-
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
 public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
 {
   protected Producer<Integer, Message> producer;

Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java Sat Apr  7 00:04:51 2012
@@ -16,9 +16,12 @@
  */
 package kafka.bridge.pig;
 
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import kafka.bridge.hadoop.KafkaOutputFormat;
 import kafka.bridge.hadoop.KafkaRecordWriter;
-
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Encoder;
 import org.apache.hadoop.fs.Path;
@@ -33,10 +36,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
 import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
 public class AvroKafkaStorage extends StoreFunc
 {
   protected KafkaRecordWriter writer;

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Sat Apr  7 00:04:51 2012
@@ -17,8 +17,6 @@
 
 package kafka
 
-import consumer.ConsumerConfig
-import producer.ProducerConfig
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
 import org.apache.log4j.jmx.LoggerDynamicMBean
@@ -30,8 +28,8 @@ object Kafka extends Logging {
     import org.apache.log4j.Logger
     Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
 
-    if (!List(1, 3).contains(args.length)) {
-      println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName()))
+    if (args.length != 1) {
+      println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
       System.exit(1)
     }
   
@@ -39,14 +37,7 @@ object Kafka extends Logging {
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
 
-      val kafkaServerStartble = args.length match {
-        case 3 =>
-          val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
-          val producerConfig = new ProducerConfig(Utils.loadProps(args(2)))
-          new KafkaServerStartable(serverConfig, consumerConfig, producerConfig)
-        case 1 =>
-          new KafkaServerStartable(serverConfig)
-      }
+      val kafkaServerStartble = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c
       Runtime.getRuntime().addShutdownHook(new Thread() {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sat Apr  7 00:04:51 2012
@@ -36,11 +36,19 @@ object ConsoleConsumer extends Logging {
 
   def main(args: Array[String]) {
     val parser = new OptionParser
-    val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.")
+    val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + 
+    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
+                             .withRequiredArg
+                             .describedAs("whitelist")
+                             .ofType(classOf[String])
+    val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
+                             .withRequiredArg
+                             .describedAs("blacklist")
+                             .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
                                       "Multiple URLS can be given to allow fail-over.")
                            .withRequiredArg
                            .describedAs("urls")
@@ -90,8 +98,20 @@ object ConsoleConsumer extends Logging {
         "skip it instead of halt.")
 
     val options: OptionSet = tryParse(parser, args)
-    checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
+    Utils.checkRequiredArgs(parser, options, zkConnectOpt)
     
+    val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+    if (topicOrFilterOpt.size != 1) {
+      error("Exactly one of whitelist/blacklist/topic is required.")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    val topicArg = options.valueOf(topicOrFilterOpt.head)
+    val filterSpec = if (options.has(blacklistOpt))
+      new Blacklist(topicArg)
+    else
+      new Whitelist(topicArg)
+
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
@@ -104,7 +124,6 @@ object ConsoleConsumer extends Logging {
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     
-    val topic = options.valueOf(topicIdOpt)
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
     
@@ -123,21 +142,20 @@ object ConsoleConsumer extends Logging {
           tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
       }
     })
-    
-    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
-    val iter =
-      if(maxMessages >= 0)
-        stream.slice(0, maxMessages)
-      else
-        stream
+
+    val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+    val iter = if(maxMessages >= 0)
+      stream.slice(0, maxMessages)
+    else
+      stream
 
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
 
     try {
-      for(message <- iter) {
+      for(messageAndTopic <- iter) {
         try {
-          formatter.writeTo(message, System.out)
+          formatter.writeTo(messageAndTopic.message, System.out)
         } catch {
           case e =>
             if (skipMessageOnError)
@@ -173,16 +191,6 @@ object ConsoleConsumer extends Logging {
     }
   }
   
-  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
-    for(arg <- required) {
-      if(!options.has(arg)) {
-        error("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-  }
-  
   def tryParseFormatterArgs(args: Iterable[String]): Properties = {
     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
     if(!splits.forall(_.length == 2)) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Sat Apr  7 00:04:51 2012
@@ -20,7 +20,6 @@ package kafka.consumer
 import java.util.Properties
 import kafka.utils.{ZKConfig, Utils}
 import kafka.api.OffsetRequest
-import kafka.common.InvalidConfigException
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
@@ -90,27 +89,10 @@ class ConsumerConfig(props: Properties) 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
 
-  /** Whitelist of topics for this mirror's embedded consumer to consume. At
-   *  most one of whitelist/blacklist may be specified. */
-  val mirrorTopicsWhitelist = Utils.getString(
-    props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
- 
-  /** Topics to skip mirroring. At most one of whitelist/blacklist may be
-   *  specified */
-  val mirrorTopicsBlackList = Utils.getString(
-    props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
-
-  if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
-      throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
-
-  val mirrorConsumerNumThreads = Utils.getInt(
-    props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
-
   /** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
    *  Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
    *  overhead of decompression.
    *  */
   val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
-
 }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Sat Apr  7 00:04:51 2012
@@ -29,12 +29,28 @@ trait ConsumerConnector {
    *  Create a list of MessageStreams for each topic.
    *
    *  @param topicCountMap  a map of (topic, #streams) pair
-   *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
-   *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
+   *  @param decoder Decoder to decode each Message to type T
+   *  @return a map of (topic, list of  KafkaStream) pairs.
+   *          The number of items in the list is #streams. Each stream supports
+   *          an iterator over message/metadata pairs.
    */
   def createMessageStreams[T](topicCountMap: Map[String,Int],
                               decoder: Decoder[T] = new DefaultDecoder)
-    : Map[String,List[KafkaMessageStream[T]]]
+    : Map[String,List[KafkaStream[T]]]
+
+  /**
+   *  Create a list of message streams for all topics that match a given filter.
+   *
+   *  @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
+   *  @param numStreams Number of streams to return
+   *  @param decoder Decoder to decode each Message to type T
+   *  @return a list of KafkaStream each of which provides an
+   *          iterator over message/metadata pairs over allowed topics.
+   */
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
+                                      numStreams: Int = 1,
+                                      decoder: Decoder[T] = new DefaultDecoder)
+    : Seq[KafkaStream[T]]
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Sat Apr  7 00:04:51 2012
@@ -19,37 +19,38 @@ package kafka.consumer
 
 import kafka.utils.{IteratorTemplate, Logging}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.message.MessageAndOffset
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference
+import kafka.message.{MessageAndOffset, MessageAndMetadata}
+
 
 /**
  * An iterator that blocks until a value can be read from the supplied queue.
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
  *
  */
-class ConsumerIterator[T](private val topic: String,
-                          private val channel: BlockingQueue[FetchedDataChunk],
+class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
                           private val decoder: Decoder[T],
                           val enableShallowIterator: Boolean)
-  extends IteratorTemplate[T] with Logging {
+  extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
   private var currentTopicInfo:PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
 
-  override def next(): T = {
-    val decodedMessage = super.next()
+  override def next(): MessageAndMetadata[T] = {
+    val item = super.next()
     if(consumedOffset < 0)
       throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
     currentTopicInfo.resetConsumeOffset(consumedOffset)
-    trace("Setting consumed offset to %d".format(consumedOffset))
+    val topic = currentTopicInfo.topic
+    trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
     ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
-    decodedMessage
+    item
   }
 
-  protected def makeNext(): T = {
+  protected def makeNext(): MessageAndMetadata[T] = {
     var currentDataChunk: FetchedDataChunk = null
     // if we don't have an iterator, get one
     var localCurrent = current.get()
@@ -82,10 +83,11 @@ class ConsumerIterator[T](private val to
     }
     val item = localCurrent.next()
     consumedOffset = item.offset
-    decoder.toEvent(item.message)
+
+    new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
   }
 
-  def clearCurrentChunk() = {
+  def clearCurrentChunk() {
     try {
       info("Clearing the current data chunk for this consumer iterator")
       current.set(null)
@@ -94,3 +96,4 @@ class ConsumerIterator[T](private val to
 }
 
 class ConsumerTimeoutException() extends RuntimeException()
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Sat Apr  7 00:04:51 2012
@@ -42,24 +42,24 @@ private [consumer] class Fetcher(val con
     fetcherThreads = EMPTY_FETCHER_THREADS
   }
 
-  def clearFetcherQueues[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+  def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
                             queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
-                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+                            messageStreams: Map[String,List[KafkaStream[_]]]) {
 
     // Clear all but the currently iterated upon chunk in the consumer thread's queue
     queuesTobeCleared.foreach(_.clear)
     info("Cleared all relevant queues for this fetcher")
 
     // Also clear the currently iterated upon chunk in the consumer threads
-    if(kafkaMessageStreams != null)
-       kafkaMessageStreams.foreach(_._2.foreach(s => s.clear()))
+    if(messageStreams != null)
+       messageStreams.foreach(_._2.foreach(s => s.clear()))
 
     info("Cleared the data chunks in all the consumer message iterators")
 
   }
 
-  def startConnections[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
-                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+  def startConnections(topicInfos: Iterable[PartitionTopicInfo],
+                       cluster: Cluster) {
     if (topicInfos == null)
       return
 

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala Sat Apr  7 00:04:51 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+
+import java.util.concurrent.BlockingQueue
+import kafka.serializer.Decoder
+import kafka.message.MessageAndMetadata
+
+class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
+                     consumerTimeoutMs: Int,
+                     private val decoder: Decoder[T],
+                     val enableShallowIterator: Boolean)
+   extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
+
+  private val iter: ConsumerIterator[T] =
+    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator)
+
+  /**
+   *  Create an iterator over messages in the stream.
+   */
+  def iterator(): ConsumerIterator[T] = iter
+
+  /**
+   * This method clears the queue being iterated during the consumer rebalancing. This is mainly
+   * to reduce the number of duplicates received by the consumer
+   */
+  def clear() {
+    iter.clearCurrentChunk()
+  }
+
+}

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Sat Apr  7 00:04:51 2012
@@ -19,35 +19,18 @@ package kafka.consumer
 
 import scala.collection._
 import scala.util.parsing.json.JSON
-import kafka.utils.Logging
+import org.I0Itec.zkclient.ZkClient
+import java.util.regex.Pattern
+import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
 
-private[kafka] object TopicCount extends Logging {
-  val myConversionFunc = {input : String => input.toInt}
-  JSON.globalNumberParser = myConversionFunc
-
-  def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = {
-    var topMap : Map[String,Int] = null
-    try {
-      JSON.parseFull(jsonString) match {
-        case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
-        case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
-      }
-    }
-    catch {
-      case e =>
-        error("error parsing consumer json string " + jsonString, e)
-        throw e
-    }
-
-    new TopicCount(consumerIdSting, topMap)
-  }
-
-}
 
-private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
-
-  def getConsumerThreadIdsPerTopic()
-    : Map[String, Set[String]] = {
+private[kafka] trait TopicCount {
+  def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
+
+  def dbString: String
+  
+  protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
+                                            topicCountMap: Map[String,  Int]) = {
     val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
     for ((topic, nConsumers) <- topicCountMap) {
       val consumerSet = new mutable.HashSet[String]
@@ -58,11 +41,96 @@ private[kafka] class TopicCount(val cons
     }
     consumerThreadIdsPerTopicMap
   }
+}
+
+private[kafka] object TopicCount extends Logging {
+
+  /*
+   * Example of whitelist topic count stored in ZooKeeper:
+   * Topics with whitetopic as prefix, and four streams: *4*whitetopic.*
+   *
+   * Example of blacklist topic count stored in ZooKeeper:
+   * Topics with blacktopic as prefix, and four streams: !4!blacktopic.*
+   */
+
+  val WHITELIST_MARKER = "*"
+  val BLACKLIST_MARKER = "!"
+  private val WHITELIST_PATTERN =
+    Pattern.compile("""\*(\p{Digit}+)\*(.*)""")
+  private val BLACKLIST_PATTERN =
+    Pattern.compile("""!(\p{Digit}+)!(.*)""")
+
+  val myConversionFunc = {input : String => input.toInt}
+  JSON.globalNumberParser = myConversionFunc
+
+  def constructTopicCount(group: String,
+                          consumerId: String,
+                          zkClient: ZkClient) : TopicCount = {
+    val dirs = new ZKGroupDirs(group)
+    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+    val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
+    val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
+
+    if (hasWhitelist || hasBlacklist)
+      info("Constructing topic count for %s from %s using %s as pattern."
+        .format(consumerId, topicCountString,
+          if (hasWhitelist) WHITELIST_PATTERN else BLACKLIST_PATTERN))
+
+    if (hasWhitelist || hasBlacklist) {
+      val matcher = if (hasWhitelist)
+        WHITELIST_PATTERN.matcher(topicCountString)
+      else
+        BLACKLIST_PATTERN.matcher(topicCountString)
+      require(matcher.matches())
+      val numStreams = matcher.group(1).toInt
+      val regex = matcher.group(2)
+      val filter = if (hasWhitelist)
+        new Whitelist(regex)
+      else
+        new Blacklist(regex)
+
+      new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
+    }
+    else {
+      var topMap : Map[String,Int] = null
+      try {
+        JSON.parseFull(topicCountString) match {
+          case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
+          case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
+        }
+      }
+      catch {
+        case e =>
+          error("error parsing consumer json string " + topicCountString, e)
+          throw e
+      }
+
+      new StaticTopicCount(consumerId, topMap)
+    }
+  }
+
+  def constructTopicCount(consumerIdString: String, topicCount: Map[String,  Int]) =
+    new StaticTopicCount(consumerIdString, topicCount)
+
+  def constructTopicCount(consumerIdString: String,
+                          filter: TopicFilter,
+                          numStreams: Int,
+                          zkClient: ZkClient) =
+    new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
+
+}
+
+private[kafka] class StaticTopicCount(val consumerIdString: String,
+                                val topicCountMap: Map[String, Int])
+                                extends TopicCount {
+
+  def getConsumerThreadIdsPerTopic =
+    makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
 
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
-      case n: TopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
+      case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
       case _ => false
     }
   }
@@ -73,7 +141,7 @@ private[kafka] class TopicCount(val cons
    *    "topic2" : 4
    *  }
    */
-  def toJsonString() : String = {
+  def dbString = {
     val builder = new StringBuilder
     builder.append("{ ")
     var i = 0
@@ -84,6 +152,29 @@ private[kafka] class TopicCount(val cons
       i += 1
     }
     builder.append(" }")
-    builder.toString
+    builder.toString()
   }
 }
+
+private[kafka] class WildcardTopicCount(zkClient: ZkClient,
+                                        consumerIdString: String,
+                                        topicFilter: TopicFilter,
+                                        numStreams: Int) extends TopicCount {
+  def getConsumerThreadIdsPerTopic = {
+    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(
+      zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
+    makeConsumerThreadIdsPerTopic(consumerIdString,
+                                  Map(wildcardTopics.map((_, numStreams)): _*))
+  }
+
+  def dbString = {
+    val marker = topicFilter match {
+      case wl: Whitelist => TopicCount.WHITELIST_MARKER
+      case bl: Blacklist => TopicCount.BLACKLIST_MARKER
+    }
+
+    "%s%d%s%s".format(marker, numStreams, marker, topicFilter.regex)
+  }
+
+}
+

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala Sat Apr  7 00:04:51 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+
+import kafka.utils.Logging
+import java.util.regex.{PatternSyntaxException, Pattern}
+
+
+sealed abstract class TopicFilter(rawRegex: String) extends Logging {
+
+  val regex = rawRegex
+          .trim
+          .replace(',', '|')
+          .replace(" ", "")
+          .replaceAll("""^["']+""","")
+          .replaceAll("""["']+$""","") // property files may bring quotes
+
+  try {
+    Pattern.compile(regex)
+  }
+  catch {
+    case e: PatternSyntaxException =>
+      throw new RuntimeException(regex + " is an invalid regex.")
+  }
+
+  override def toString = regex
+
+  def requiresTopicEventWatcher: Boolean
+
+  def isTopicAllowed(topic: String): Boolean
+}
+
+case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
+  override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""")
+
+  override def isTopicAllowed(topic: String) = {
+    val allowed = topic.matches(regex)
+
+    debug("%s %s".format(
+      topic, if (allowed) "allowed" else "filtered"))
+
+    allowed
+  }
+
+
+}
+
+case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
+  override def requiresTopicEventWatcher = true
+
+  override def isTopicAllowed(topic: String) = {
+    val allowed = !topic.matches(regex)
+
+    debug("%s %s".format(
+      topic, if (allowed) "allowed" else "filtered"))
+
+    allowed
+  }
+}
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Sat Apr  7 00:04:51 2012
@@ -34,6 +34,7 @@ import kafka.common.{ConsumerRebalanceFa
 import java.lang.IllegalStateException
 import kafka.utils.ZkUtils._
 
+
 /**
  * This class handles the consumers interaction with zookeeper
  *
@@ -86,16 +87,37 @@ trait ZookeeperConsumerConnectorMBean {
 
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
-  extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging {
-
+        extends ConsumerConnector with ZookeeperConsumerConnectorMBean
+        with Logging {
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[Fetcher] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
-  // queues : (topic,consumerThreadId) -> queue
-  private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
+  // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
+  private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+  private val messageStreamCreated = new AtomicBoolean(false)
+
+  private var sessionExpirationListener: ZKSessionExpireListener = null
+  private var loadBalancerListener: ZKRebalancerListener = null
+
+  private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+
+  val consumerIdString = {
+    var consumerUuid : String = null
+    config.consumerId match {
+      case Some(consumerId) // for testing only
+      => consumerUuid = consumerId
+      case None // generate unique consumerId automatically
+      => val uuid = UUID.randomUUID()
+      consumerUuid = "%s-%d-%s".format(
+        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
+        uuid.getMostSignificantBits().toHexString.substring(0,8))
+    }
+    config.groupId + "_" + consumerUuid
+  }
+  this.logIdent = consumerIdString + " "
 
   connectZk()
   createFetcher()
@@ -108,10 +130,18 @@ private[kafka] class ZookeeperConsumerCo
 
   def createMessageStreams[T](topicCountMap: Map[String,Int],
                               decoder: Decoder[T])
-      : Map[String,List[KafkaMessageStream[T]]] = {
+      : Map[String,List[KafkaStream[T]]] = {
+    if (messageStreamCreated.getAndSet(true))
+      throw new RuntimeException(this.getClass.getSimpleName +
+                                   " can create message streams at most once")
     consume(topicCountMap, decoder)
   }
 
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
+    val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
+    wildcardStreamsHandler.streams
+  }
+
   private def createFetcher() {
     if (enableFetcher)
       fetcher = Some(new Fetcher(config, zkClient))
@@ -126,6 +156,9 @@ private[kafka] class ZookeeperConsumerCo
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("ZKConsumerConnector shutting down")
+
+      if (wildcardTopicWatcher != null)
+        wildcardTopicWatcher.shutdown()
       try {
         scheduler.shutdownNow()
         fetcher match {
@@ -150,69 +183,42 @@ private[kafka] class ZookeeperConsumerCo
 
   def consume[T](topicCountMap: scala.collection.Map[String,Int],
                  decoder: Decoder[T])
-      : Map[String,List[KafkaMessageStream[T]]] = {
+      : Map[String,List[KafkaStream[T]]] = {
     debug("entering consume ")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
 
-    val dirs = new ZKGroupDirs(config.groupId)
-    var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]]
+    val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
 
-    var consumerUuid : String = null
-    config.consumerId match {
-      case Some(consumerId) // for testing only
-      => consumerUuid = consumerId
-      case None // generate unique consumerId automatically
-      => val uuid = UUID.randomUUID()
-        consumerUuid = "%s-%d-%s".format(
-          InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
-          uuid.getMostSignificantBits().toHexString.substring(0,8))
-    }
-    val consumerIdString = config.groupId + "_" + consumerUuid
-    val topicCount = new TopicCount(consumerIdString, topicCountMap)
-
-    // create a queue per topic per consumer thread
-    val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
-    for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
-      var streamList: List[KafkaMessageStream[T]] = Nil
-      for (threadId <- threadIdSet) {
-        val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
-        queues.put((topic, threadId), stream)
-        streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
-      }
-      ret += (topic -> streamList)
-      debug("adding topic " + topic + " and stream to map..")
-    }
-
-    // listener to consumer and partition changes
-    val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString, ret)
-    registerConsumerInZK(dirs, consumerIdString, topicCount)
+    val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
 
-    // register listener for session expired event
-    zkClient.subscribeStateChanges(
-      new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener))
-
-    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+    // make a list of (queue,stream) pairs, one pair for each threadId
+    val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
+      threadIdSet.map(_ => {
+        val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+        val stream = new KafkaStream[T](
+          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+        (queue, stream)
+      })
+    ).flatten.toList
 
-    ret.foreach { topicAndStreams =>
-      // register on broker partition path changes
-      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
-      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
-    }
+    val dirs = new ZKGroupDirs(config.groupId)
+    registerConsumerInZK(dirs, consumerIdString, topicCount)
+    reinitializeConsumer(topicCount, queuesAndStreams)
 
-    // explicitly trigger load balancing for this consumer
-    loadBalancerListener.syncedRebalance()
-    ret
+    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
   }
 
-  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+    createEphemeralPathExpectConflict(zkClient,
+                                      dirs.consumerRegistryDir + "/" + consumerIdString,
+                                      topicCount.dbString)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
   private def sendShutdownToAllQueues() = {
-    for (queue <- queues.values) {
+    for (queue <- topicThreadIdAndQueues.values) {
       debug("Clearing up queue")
       queue.clear()
       queue.put(ZookeeperConsumerConnector.shutdownCommand)
@@ -334,10 +340,10 @@ private[kafka] class ZookeeperConsumerCo
     producedOffset
   }
 
-  class ZKSessionExpireListener[T](val dirs: ZKGroupDirs,
+  class ZKSessionExpireListener(val dirs: ZKGroupDirs,
                                  val consumerIdString: String,
                                  val topicCount: TopicCount,
-                                 val loadBalancerListener: ZKRebalancerListener[T])
+                                 val loadBalancerListener: ZKRebalancerListener)
     extends IZkStateListener {
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
@@ -359,10 +365,10 @@ private[kafka] class ZookeeperConsumerCo
        *  consumer in the consumer registry and trigger a rebalance.
        */
       info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
-      loadBalancerListener.resetState
+      loadBalancerListener.resetState()
       registerConsumerInZK(dirs, consumerIdString, topicCount)
       // explicitly trigger load balancing for this consumer
-      loadBalancerListener.syncedRebalance
+      loadBalancerListener.syncedRebalance()
 
       // There is no need to resubscribe to child and state changes.
       // The child change watchers will be set inside rebalance when we read the children list.
@@ -370,8 +376,8 @@ private[kafka] class ZookeeperConsumerCo
 
   }
 
-  class ZKRebalancerListener[T](val group: String, val consumerIdString: String,
-                                kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
+  class ZKRebalancerListener(val group: String, val consumerIdString: String,
+                             val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
     extends IZkChildListener {
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
@@ -459,7 +465,7 @@ private[kafka] class ZookeeperConsumerCo
               info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
           }
           // stop all fetchers and clear all the queues to avoid data duplication
-          closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
+          closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
           Thread.sleep(config.rebalanceBackoffMs)
         }
       }
@@ -468,7 +474,7 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def rebalance(cluster: Cluster): Boolean = {
-      val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+      val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
 
@@ -478,7 +484,7 @@ private[kafka] class ZookeeperConsumerCo
        * But if we don't stop the fetchers first, this consumer would continue returning data for released
        * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
        */
-      closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap)
+      closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
 
       releasePartitionOwnership(topicRegistry)
 
@@ -531,7 +537,7 @@ private[kafka] class ZookeeperConsumerCo
         debug("Partitions per topic cache " + partitionsPerTopicMap)
         debug("Consumers per topic cache " + consumersPerTopicMap)
         topicRegistry = currentTopicRegistry
-        updateFetcher(cluster, kafkaMessageStreams)
+        updateFetcher(cluster)
         true
       }else {
         false
@@ -539,12 +545,12 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def closeFetchersForQueues(cluster: Cluster,
-                                       kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                                       messageStreams: Map[String,List[KafkaStream[_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
         case Some(f) => f.stopConnectionsToAllBrokers
-        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
+        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
         info("Committing all offsets after clearing the fetcher queues")
         /**
         * here, we need to commit offsets before stopping the consumer from returning any more messages
@@ -559,16 +565,15 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+    private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
                               relevantTopicThreadIdsMap: Map[String, Set[String]]) {
       // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
       // after this rebalancing attempt
-      val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
-      closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+      val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+      closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
     }
 
-    private def updateFetcher[T](cluster: Cluster,
-                                 kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+    private def updateFetcher(cluster: Cluster) {
       // update partitions for fetcher
       var allPartitionInfos : List[PartitionTopicInfo] = Nil
       for (partitionInfos <- topicRegistry.values)
@@ -579,7 +584,7 @@ private[kafka] class ZookeeperConsumerCo
 
       fetcher match {
         case Some(f) =>
-          f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams)
+          f.startConnections(allPartitionInfos, cluster)
         case None =>
       }
     }
@@ -637,7 +642,7 @@ private[kafka] class ZookeeperConsumerCo
         }
       else
         offset = offsetString.toLong
-      val queue = queues.get((topic, consumerThreadId))
+      val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
@@ -651,5 +656,155 @@ private[kafka] class ZookeeperConsumerCo
       debug(partTopicInfo + " selected new offset " + offset)
     }
   }
+
+  private def reinitializeConsumer[T](
+      topicCount: TopicCount,
+      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
+
+    val dirs = new ZKGroupDirs(config.groupId)
+
+    // listener to consumer and partition changes
+    if (loadBalancerListener == null) {
+      val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
+      loadBalancerListener = new ZKRebalancerListener(
+        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
+    }
+
+    // register listener for session expired event
+    if (sessionExpirationListener == null)
+      sessionExpirationListener = new ZKSessionExpireListener(
+        dirs, consumerIdString, topicCount, loadBalancerListener)
+
+    val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
+
+    // map of {topic -> Set(thread-1, thread-2, ...)}
+    val consumerThreadIdsPerTopic: Map[String, Set[String]] =
+      topicCount.getConsumerThreadIdsPerTopic
+
+    /*
+     * This usage of map flatten breaks up consumerThreadIdsPerTopic into
+     * a set of (topic, thread-id) pairs that we then use to construct
+     * the updated (topic, thread-id) -> queues map
+     */
+    implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _))
+
+    // iterator over (topic, thread-id) tuples
+    val topicThreadIds: Iterable[(String, String)] =
+      consumerThreadIdsPerTopic.flatten
+
+    // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
+    val threadQueueStreamPairs = topicCount match {
+      case wildTopicCount: WildcardTopicCount =>
+        for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
+      case statTopicCount: StaticTopicCount => {
+        require(topicThreadIds.size == queuesAndStreams.size,
+          "Mismatch between thread ID count (%d) and queue count (%d)".format(
+          topicThreadIds.size, queuesAndStreams.size))
+        topicThreadIds.zip(queuesAndStreams)
+      }
+    }
+
+    threadQueueStreamPairs.foreach(e => {
+      val topicThreadId = e._1
+      val q = e._2._1
+      topicThreadIdAndQueues.put(topicThreadId, q)
+    })
+
+    val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
+    groupedByTopic.foreach(e => {
+      val topic = e._1
+      val streams = e._2.map(_._2._2).toList
+      topicStreamsMap += (topic -> streams)
+      debug("adding topic %s and %d streams to map.".format(topic, streams.size))
+    })
+
+    // listener to consumer and partition changes
+    zkClient.subscribeStateChanges(sessionExpirationListener)
+
+    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+
+    topicStreamsMap.foreach { topicAndStreams =>
+      // register on broker partition path changes
+      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
+      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+    }
+
+    // explicitly trigger load balancing for this consumer
+    loadBalancerListener.syncedRebalance()
+  }
+
+  class WildcardStreamsHandler[T](topicFilter: TopicFilter,
+                                  numStreams: Int,
+                                  decoder: Decoder[T])
+                                extends TopicEventHandler[String] {
+
+    if (messageStreamCreated.getAndSet(true))
+      throw new RuntimeException("Each consumer connector can create " +
+        "message streams by filter at most once.")
+
+    private val wildcardQueuesAndStreams = (1 to numStreams)
+      .map(e => {
+        val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+        val stream = new KafkaStream[T](
+          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+        (queue, stream)
+    }).toList
+
+     // bootstrap with existing topics
+    private var wildcardTopics =
+      getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+        .filter(topicFilter.isTopicAllowed)
+
+    private val wildcardTopicCount = TopicCount.constructTopicCount(
+      consumerIdString, topicFilter, numStreams, zkClient)
+
+    val dirs = new ZKGroupDirs(config.groupId)
+    registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
+    reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+
+    if (!topicFilter.requiresTopicEventWatcher) {
+      info("Not creating event watcher for trivial whitelist " + topicFilter)
+    }
+    else {
+      info("Creating topic event watcher for whitelist " + topicFilter)
+      wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
+
+      /*
+       * Topic events will trigger subsequent synced rebalances. Also, the
+       * consumer will get registered only after an allowed topic becomes
+       * available.
+       */
+    }
+
+    def handleTopicEvent(allTopics: Seq[String]) {
+      debug("Handling topic event")
+
+      val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
+
+      val addedTopics = updatedTopics filterNot (wildcardTopics contains)
+      if (addedTopics.nonEmpty)
+        info("Topic event: added topics = %s"
+                             .format(addedTopics))
+
+      /*
+       * TODO: Deleted topics are interesting (and will not be a concern until
+       * 0.8 release). We may need to remove these topics from the rebalance
+       * listener's map in reinitializeConsumer.
+       */
+      val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
+      if (deletedTopics.nonEmpty)
+        info("Topic event: deleted topics = %s"
+                             .format(deletedTopics))
+
+      wildcardTopics = updatedTopics
+      info("Topics to consume = %s".format(wildcardTopics))
+
+      if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
+        reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+    }
+
+    def streams: Seq[KafkaStream[T]] =
+      wildcardQueuesAndStreams.map(_._2)
+  }
 }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Sat Apr  7 00:04:51 2012
@@ -21,11 +21,9 @@ import scala.collection.JavaConversions.
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.server.KafkaServerStartable
-import kafka.common.ConsumerRebalanceFailedException
 
 class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
-    val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging {
+    val eventHandler: TopicEventHandler[String]) extends Logging {
 
   val lock = new Object()
 
@@ -35,7 +33,7 @@ class ZookeeperTopicEventWatcher(val con
   startWatchingTopicEvents()
 
   private def startWatchingTopicEvents() {
-    val topicEventListener = new ZkTopicEventListener(kafkaServerStartable)
+    val topicEventListener = new ZkTopicEventListener()
     ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
 
     zkClient.subscribeStateChanges(
@@ -52,6 +50,7 @@ class ZookeeperTopicEventWatcher(val con
 
   def shutdown() {
     lock.synchronized {
+      info("Shutting down topic event watcher.")
       if (zkClient != null) {
         stopWatchingTopicEvents()
         zkClient.close()
@@ -62,7 +61,7 @@ class ZookeeperTopicEventWatcher(val con
     }
   }
 
-  class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener {
+  class ZkTopicEventListener extends IZkChildListener {
 
     @throws(classOf[Exception])
     def handleChildChange(parent: String, children: java.util.List[String]) {
@@ -76,11 +75,8 @@ class ZookeeperTopicEventWatcher(val con
           }
         }
         catch {
-          case e: ConsumerRebalanceFailedException =>
-            fatal("can't rebalance in embedded consumer; proceed to shutdown", e)
-            kafkaServerStartable.shutdown()
           case e =>
-            error("error in handling child changes in embedded consumer", e)
+            error("error in handling child changes", e)
         }
       }
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Sat Apr  7 00:04:51 2012
@@ -17,34 +17,53 @@
 
 package kafka.javaapi.consumer;
 
-import kafka.consumer.KafkaMessageStream;
-import kafka.message.Message;
-import kafka.serializer.Decoder;
 
 import java.util.List;
 import java.util.Map;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.TopicFilter;
+import kafka.message.Message;
+import kafka.serializer.Decoder;
 
 public interface ConsumerConnector {
-    /**
-     *  Create a list of MessageStreams of type T for each topic.
-     *
-     *  @param topicCountMap  a map of (topic, #streams) pair
-     *  @param decoder a decoder that converts from Message to T
-     *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
-     *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
-     */
-    public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
-            Map<String, Integer> topicCountMap, Decoder<T> decoder);
-    public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
-            Map<String, Integer> topicCountMap);
-
-    /**
-     *  Commit the offsets of all broker partitions connected by this connector.
-     */
-    public void commitOffsets();
-
-    /**
-     *  Shut down the connector
-     */
-    public void shutdown();
+  /**
+   *  Create a list of MessageStreams of type T for each topic.
+   *
+   *  @param topicCountMap  a map of (topic, #streams) pair
+   *  @param decoder a decoder that converts from Message to T
+   *  @return a map of (topic, list of  KafkaStream) pairs.
+   *          The number of items in the list is #streams. Each stream supports
+   *          an iterator over message/metadata pairs.
+   */
+  public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
+      Map<String, Integer> topicCountMap, Decoder<T> decoder);
+  public Map<String, List<KafkaStream<Message>>> createMessageStreams(
+      Map<String, Integer> topicCountMap);
+
+  /**
+   *  Create a list of MessageAndTopicStreams containing messages of type T.
+   *
+   *  @param topicFilter a TopicFilter that specifies which topics to
+   *                    subscribe to (encapsulates a whitelist or a blacklist).
+   *  @param numStreams the number of message streams to return.
+   *  @param decoder a decoder that converts from Message to T
+   *  @return a list of KafkaStream. Each stream supports an
+   *          iterator over its MessageAndMetadata elements.
+   */
+  public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
+  public List<KafkaStream<Message>> createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams);
+  public List<KafkaStream<Message>> createMessageStreamsByFilter(
+      TopicFilter topicFilter);
+
+  /**
+   *  Commit the offsets of all broker partitions connected by this connector.
+   */
+  public void commitOffsets();
+
+  /**
+   *  Shut down the connector
+   */
+  public void shutdown();
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Sat Apr  7 00:04:51 2012
@@ -16,9 +16,11 @@
  */
 package kafka.javaapi.consumer
 
-import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import kafka.message.Message
 import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.consumer._
+import scala.collection.JavaConversions.asList
+
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -68,14 +70,14 @@ private[kafka] class ZookeeperConsumerCo
   def createMessageStreams[T](
         topicCountMap: java.util.Map[String,java.lang.Integer],
         decoder: Decoder[T])
-      : java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = {
+      : java.util.Map[String,java.util.List[KafkaStream[T]]] = {
     import scala.collection.JavaConversions._
 
     val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
     val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
-    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]]
+    val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
     for ((topic, streams) <- scalaReturn) {
-      var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]]
+      var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
       for (stream <- streams)
         javaStreamList.add(stream)
       ret.put(topic, javaStreamList)
@@ -85,9 +87,17 @@ private[kafka] class ZookeeperConsumerCo
 
   def createMessageStreams(
         topicCountMap: java.util.Map[String,java.lang.Integer])
-      : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] =
+      : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
     createMessageStreams(topicCountMap, new DefaultDecoder)
 
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
+    asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
+
+  def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
+    createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
+
+  def createMessageStreamsByFilter(topicFilter: TopicFilter) =
+    createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
 
   def commitOffsets() {
     underlying.commitOffsets

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Sat Apr  7 00:04:51 2012
@@ -17,8 +17,10 @@
 
 package kafka.javaapi.message
 
+
 import kafka.message.{MessageAndOffset, InvalidMessageException}
 
+
 /**
  * A set of messages. A message set has a fixed serialized form, though the container
  * for the bytes could be either in-memory or on disk. A The format of each message is

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java Sat Apr  7 00:04:51 2012
@@ -16,9 +16,9 @@
 */
 package kafka.javaapi.producer.async;
 
-import kafka.producer.async.QueueItem;
 
 import java.util.Properties;
+import kafka.producer.async.QueueItem;
 
 /**
  * Callback handler APIs for use in the async producer. The purpose is to

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java Sat Apr  7 00:04:51 2012
@@ -16,12 +16,12 @@
 */
 package kafka.javaapi.producer.async;
 
-import kafka.javaapi.producer.SyncProducer;
-import kafka.producer.async.QueueItem;
-import kafka.serializer.Encoder;
 
 import java.util.List;
 import java.util.Properties;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.async.QueueItem;
+import kafka.serializer.Encoder;
 
 /**
  * Handler that dispatches the batched data from the queue of the

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Sat Apr  7 00:04:51 2012
@@ -36,7 +36,6 @@ import kafka.common.{MessageSizeTooLarge
 class ByteBufferMessageSet(private val buffer: ByteBuffer,
                            private val initialOffset: Long = 0L,
                            private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
-  private var validByteCount = -1L
   private var shallowValidByteCount = -1L
   if(sizeInBytes > Int.MaxValue)
     throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala?rev=1310645&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala Sat Apr  7 00:04:51 2012
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+case class MessageAndMetadata[T](message: T, topic: String = "")
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala Sat Apr  7 00:04:51 2012
@@ -13,11 +13,10 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.message
 
-/**
- * Represents message and offset of the next message. This is used in the MessageSet to iterate over it
- */
-case class MessageAndOffset(val message: Message, val offset: Long)
+
+case class MessageAndOffset(message: Message, offset: Long)
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Sat Apr  7 00:04:51 2012
@@ -22,9 +22,7 @@ import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
 import kafka.utils.Logging
-import kafka.serializer.Encoder
 import java.util.{Properties, Date}
-import kafka.message.Message
 import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1310645&r1=1310644&r2=1310645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Sat Apr  7 00:04:51 2012
@@ -22,7 +22,7 @@ import kafka.utils._
 import java.util.Properties
 import kafka.cluster.{Partition, Broker}
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException}
 import kafka.api.ProducerRequest
 
 class Producer[K,V](config: ProducerConfig,