You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2012/05/31 03:51:27 UTC

svn commit: r1344526 [1/3] - in /incubator/kafka/branches/0.8: ./ clients/cpp/src/ contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/ contrib/hadoop-producer/src/main/java/kaf...

Author: joestein
Date: Thu May 31 01:51:23 2012
New Revision: 1344526

URL: http://svn.apache.org/viewvc?rev=1344526&view=rev
Log:
KAFKA-348 merge trunk to branch 1239902:1310937 patch by Joe Stein reviewed by Jun Rao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaStream.scala
      - copied unchanged from r1343118, incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaStream.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicFilter.scala
      - copied unchanged from r1343118, incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndMetadata.scala
      - copied unchanged from r1343118, incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
      - copied unchanged from r1343118, incubator/kafka/trunk/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
      - copied unchanged from r1343118, incubator/kafka/trunk/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala
      - copied unchanged from r1343118, incubator/kafka/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
      - copied unchanged from r1343118, incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
    incubator/kafka/branches/0.8/system_test/mirror_maker/
      - copied from r1343118, incubator/kafka/trunk/system_test/mirror_maker/
    incubator/kafka/branches/0.8/system_test/mirror_maker/README
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/README
    incubator/kafka/branches/0.8/system_test/mirror_maker/bin/
      - copied from r1343118, incubator/kafka/trunk/system_test/mirror_maker/bin/
    incubator/kafka/branches/0.8/system_test/mirror_maker/bin/expected.out
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/bin/expected.out
    incubator/kafka/branches/0.8/system_test/mirror_maker/bin/run-test.sh
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/
      - copied from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/blacklisttest.consumer.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/blacklisttest.consumer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/mirror_producer.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/mirror_producer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/server_source_1_1.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_1.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/server_source_1_2.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/server_source_1_2.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/server_source_2_1.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_1.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/server_source_2_2.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/server_source_2_2.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/server_target_1_1.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_1.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/server_target_1_2.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/server_target_1_2.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/zookeeper_source_1.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_1.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/zookeeper_source_2.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_source_2.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker/config/zookeeper_target.properties
      - copied unchanged from r1343118, incubator/kafka/trunk/system_test/mirror_maker/config/zookeeper_target.properties
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
    incubator/kafka/branches/0.8/system_test/embedded_consumer/
Modified:
    incubator/kafka/branches/0.8/   (props changed)
    incubator/kafka/branches/0.8/clients/cpp/src/encoder.hpp
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
    incubator/kafka/branches/0.8/contrib/hadoop-producer/README.md
    incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
    incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
    incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
    incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/ExampleUtils.java
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/broker_failure/config/log4j.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer1.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer2.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/mirror_producer3.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source1.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source2.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source3.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_source4.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target1.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target2.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/server_target3.properties
    incubator/kafka/branches/0.8/system_test/broker_failure/config/whitelisttest.consumer.properties

Propchange: incubator/kafka/branches/0.8/
------------------------------------------------------------------------------
  Merged /incubator/kafka/trunk:r1239903-1343118

Modified: incubator/kafka/branches/0.8/clients/cpp/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/clients/cpp/src/encoder.hpp?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/clients/cpp/src/encoder.hpp (original)
+++ incubator/kafka/branches/0.8/clients/cpp/src/encoder.hpp Thu May 31 01:51:23 2012
@@ -16,7 +16,7 @@
 */
 /*
  * encoder.hpp
- *
+ */
 
 #ifndef KAFKA_ENCODER_HPP_
 #define KAFKA_ENCODER_HPP_

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Thu May 31 01:51:23 2012
@@ -16,6 +16,11 @@
  */
 package kafka.etl;
 
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.OffsetRequest;
@@ -23,7 +28,6 @@ import kafka.common.ErrorMapping;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.javaapi.message.MessageSet;
 import kafka.message.MessageAndOffset;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
@@ -31,12 +35,6 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
 
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-
 @SuppressWarnings({ "deprecation"})
 public class KafkaETLContext {
     
@@ -139,7 +137,7 @@ public class KafkaETLContext {
             while ( !gotNext && _respIterator.hasNext()) {
                 ByteBufferMessageSet msgSet = _respIterator.next();
                 if ( hasError(msgSet)) return false;
-                _messageIt =  (Iterator<MessageAndOffset>) msgSet.iterator();
+                _messageIt = msgSet.iterator();
                 gotNext = get(key, value);
             }
         }
@@ -194,17 +192,17 @@ public class KafkaETLContext {
     
     protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
         if (_messageIt != null && _messageIt.hasNext()) {
-            MessageAndOffset msgAndOffset = _messageIt.next();
+            MessageAndOffset messageAndOffset = _messageIt.next();
             
-            ByteBuffer buf = msgAndOffset.message().payload();
+            ByteBuffer buf = messageAndOffset.message().payload();
             int origSize = buf.remaining();
             byte[] bytes = new byte[origSize];
-            buf.get(bytes, buf.position(), origSize);
+          buf.get(bytes, buf.position(), origSize);
             value.set(bytes, 0, origSize);
             
-            key.set(_index, _offset, msgAndOffset.message().checksum());
+            key.set(_index, _offset, messageAndOffset.message().checksum());
             
-            _offset = msgAndOffset.offset();  //increase offset
+            _offset = messageAndOffset.offset();  //increase offset
             _count ++;  //increase count
             
             return true;

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java Thu May 31 01:51:23 2012
@@ -16,6 +16,7 @@
  */
 package kafka.etl;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
@@ -23,13 +24,13 @@ import kafka.consumer.SimpleConsumer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
 
 
 @SuppressWarnings("deprecation")

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java Thu May 31 01:51:23 2012
@@ -16,13 +16,13 @@
  */
 package kafka.etl;
 
+
 import java.net.URI;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java Thu May 31 01:51:23 2012
@@ -16,11 +16,11 @@
  */
 package kafka.etl;
 
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import org.apache.hadoop.io.WritableComparable;
-import kafka.etl.KafkaETLKey;
 
 public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
 

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java Thu May 31 01:51:23 2012
@@ -17,6 +17,7 @@
 
 package kafka.etl;
 
+
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -33,7 +34,6 @@ import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Thu May 31 01:51:23 2012
@@ -17,32 +17,24 @@
 
 package kafka.etl.impl;
 
-import java.io.IOException;
+
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
-import java.util.Map.Entry;
 import java.util.Properties;
-
-import kafka.message.NoCompressionCodec;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-
+import java.util.Random;
 import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLRequest;
-import kafka.etl.KafkaETLUtils;
 import kafka.etl.Props;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
 import kafka.javaapi.producer.SyncProducer;
+import kafka.message.Message;
 import kafka.producer.SyncProducerConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Use this class to produce test events to Kafka server. Each event contains a

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/README.md?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/README.md (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/README.md Thu May 31 01:51:23 2012
@@ -1,6 +1,14 @@
 Hadoop to Kafka Bridge
 ======================
 
+What's new?
+-----------
+
+* Now supports Kafka's software load balancer (Kafka URIs are specified with
+  kafka+zk as the scheme, as described below)
+* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy
+  SyncProducer.
+
 What is it?
 -----------
 
@@ -17,8 +25,10 @@ multiple times in the same push.
 How do I use it?
 ----------------
 
-With this bridge, Kafka topics are URIs and are specified as
-`kafka://<kafka-server>/<kafka-topic>`.
+With this bridge, Kafka topics are URIs and are specified in one of two
+formats: `kafka+zk://<zk-path>#<kafka-topic>`, which uses the software load
+balancer, or the legacy `kafka://<kafka-server>/<kafka-topic>` to connect to a
+specific Kafka broker.
 
 ### Pig ###
 
@@ -27,17 +37,19 @@ row. To push data via Kafka, store to th
 with the Avro schema as its first argument. You'll need to register the
 appropriate Kafka JARs. Here is what an example Pig script looks like:
 
-    REGISTER hadoop-kafka-bridge-0.5.2.jar;
+    REGISTER hadoop-producer_2.8.0-0.7.0.jar;
     REGISTER avro-1.4.0.jar;
     REGISTER piggybank.jar;
-    REGISTER kafka-0.5.2.jar;
+    REGISTER kafka-0.7.0.jar;
     REGISTER jackson-core-asl-1.5.5.jar;
     REGISTER jackson-mapper-asl-1.5.5.jar;
+    REGISTER zkclient-20110412.jar;
+    REGISTER zookeeper-3.3.4.jar;
     REGISTER scala-library.jar;
 
     member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
     names = FOREACH member_info GENERATE name;
-    STORE member_info INTO 'kafka://my-broker:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
+    STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
 
 That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
 from Pig's data model to the specified Avro schema.
@@ -46,8 +58,8 @@ Further, multi-store is possible with Ka
 multiple topics and brokers in the same job:
 
     SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
-    STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
-    STORE others INTO 'kafka://my-broker:9092/others' USING AvroKafkaStorage('$schema');
+    STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema');
+    STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema');
 
 ### KafkaOutputFormat ###
 
@@ -126,9 +138,10 @@ Normally, you needn't change any of thes
   docs). Default is 64*1024 (64KB). 
 * kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer
   docs). Default is 1024*1024 (1MB).
+* kafka.output.compression_codec: The compression codec to use (see Kafka producer
+  docs). Default is 0 (no compression).
 
-For easier debugging, the above values as well as the Kafka URI
-(kafka.output.url), the output server (kafka.output.server), the topic
-(kafka.output.topic), and the schema (kafka.output.schema) are injected into
-the job's configuration.
+For easier debugging, the above values as well as the Kafka broker information
+(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic),
+and the schema (kafka.output.schema) are injected into the job's configuration.
 

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java Thu May 31 01:51:23 2012
@@ -16,8 +16,9 @@
  */
 package kafka.bridge.examples;
 
-import kafka.bridge.hadoop.KafkaOutputFormat;
 
+import java.io.IOException;
+import kafka.bridge.hadoop.KafkaOutputFormat;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -27,8 +28,6 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
-import java.io.IOException;
-
 public class TextPublisher
 {
   public static void main(String[] args) throws Exception

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Thu May 31 01:51:23 2012
@@ -16,29 +16,46 @@
  */
 package kafka.bridge.hadoop;
 
-import java.util.Properties;
-
-import kafka.javaapi.producer.SyncProducer;
-import kafka.producer.SyncProducerConfig;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.Properties;
+import kafka.javaapi.producer.Producer;
+import kafka.message.Message;
+import kafka.producer.ProducerConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
-import java.io.IOException;
-import java.net.URI;
+import org.apache.log4j.Logger;
 
 public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
 {
+  private Logger log = Logger.getLogger(KafkaOutputFormat.class);
+
   public static final String KAFKA_URL = "kafka.output.url";
+  /** Bytes to buffer before the OutputFormat does a send */
+  public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+
+  /** Default value for Kafka's connect.timeout.ms */
   public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000;
+  /** Default value for Kafka's reconnect.interval*/
   public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000;
+  /** Default value for Kafka's buffer.size */
   public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024;
+  /** Default value for Kafka's max.message.size */
   public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024;
-  public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+  /** Default value for Kafka's producer.type */
+  public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync";
+  /** Default value for Kafka's compression.codec */
+  public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0;
 
   public KafkaOutputFormat()
   {
@@ -77,40 +94,80 @@ public class KafkaOutputFormat<W extends
     Path outputPath = getOutputPath(context);
     if (outputPath == null)
       throw new IllegalArgumentException("no kafka output url specified");
-    URI uri = outputPath.toUri();
+    URI uri = URI.create(outputPath.toString());
     Configuration job = context.getConfiguration();
 
-    final String topic = uri.getPath().substring(1);        // ignore the initial '/' in the path
+    Properties props = new Properties();
+    String topic;
 
     final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE);
     final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT);
     final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL);
     final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE);
     final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE);
+    final String producerType = job.get("kafka.output.producer_type", KAFKA_PRODUCER_PRODUCER_TYPE);
+    final int compressionCodec = job.getInt("kafka.output.compression_codec", KAFKA_PRODUCER_COMPRESSION_CODEC);
 
-    job.set("kafka.output.server", String.format("%s:%d", uri.getHost(), uri.getPort()));
-    job.set("kafka.output.topic", topic);
     job.setInt("kafka.output.queue_size", queueSize);
     job.setInt("kafka.output.connect_timeout", timeout);
     job.setInt("kafka.output.reconnect_interval", interval);
     job.setInt("kafka.output.bufsize", bufSize);
     job.setInt("kafka.output.max_msgsize", maxSize);
+    job.set("kafka.output.producer_type", producerType);
+    job.setInt("kafka.output.compression_codec", compressionCodec);
 
-    if (uri.getHost().isEmpty())
-      throw new IllegalArgumentException("missing kafka server");
-    if (uri.getPath().isEmpty())
-      throw new IllegalArgumentException("missing kafka topic");
-
-    Properties props = new Properties();
-    props.setProperty("host", uri.getHost());
-    props.setProperty("port", Integer.toString(uri.getPort()));
+    props.setProperty("producer.type", producerType);
     props.setProperty("buffer.size", Integer.toString(bufSize));
     props.setProperty("connect.timeout.ms", Integer.toString(timeout));
     props.setProperty("reconnect.interval", Integer.toString(interval));
     props.setProperty("max.message.size", Integer.toString(maxSize));
+    props.setProperty("compression.codec", Integer.toString(compressionCodec));
 
-    SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
+    if (uri.getScheme().equals("kafka+zk")) {
+      // Software load balancer:
+      //  URL: kafka+zk://<zk connect path>#<kafka topic>
+      //  e.g. kafka+zk://kafka-zk:2181/kafka#foobar
+
+      String zkConnect = uri.getAuthority() + uri.getPath();
+
+      props.setProperty("zk.connect", zkConnect);
+      job.set("kafka.zk.connect", zkConnect);
+
+      topic = uri.getFragment();
+      if (topic == null)
+        throw new IllegalArgumentException("no topic specified in kafka uri fragment");
+
+      log.info(String.format("using kafka zk.connect %s (topic %s)", zkConnect, topic));
+    } else if (uri.getScheme().equals("kafka")) {
+      // using the legacy direct broker list
+      // URL: kafka://<kafka host>/<topic>
+      // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
+
+      // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique
+      // (KAFKA-258 will remove the broker_id requirement)
+      StringBuilder brokerListBuilder = new StringBuilder();
+      String delim = "";
+      int brokerId = 0;
+      for (String serverPort : uri.getAuthority().split(",")) {
+        brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort));
+        delim = ",";
+        brokerId++;
+      }
+      String brokerList = brokerListBuilder.toString();
+
+      props.setProperty("broker.list", brokerList);
+      job.set("kafka.broker.list", brokerList);
+
+      if (uri.getPath() == null || uri.getPath().length() <= 1)
+        throw new IllegalArgumentException("no topic specified in kafka uri");
+
+      topic = uri.getPath().substring(1);             // ignore the initial '/' in the path
+      job.set("kafka.output.topic", topic);
+      log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
+    } else
+      throw new IllegalArgumentException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)");
+
+    Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
     return new KafkaRecordWriter<W>(producer, topic, queueSize);
   }
 }
-

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Thu May 31 01:51:23 2012
@@ -16,30 +16,28 @@
  */
 package kafka.bridge.hadoop;
 
-import kafka.message.Message;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.javaapi.producer.SyncProducer;
 
-import kafka.message.NoCompressionCodec;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
+import kafka.message.Message;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
 {
-  protected SyncProducer producer;
+  protected Producer<Integer, Message> producer;
   protected String topic;
 
-  protected List<Message> msgList = new ArrayList<Message>();
+  protected List<ProducerData<Integer, Message>> msgList = new LinkedList<ProducerData<Integer, Message>>();
   protected int totalSize = 0;
   protected int queueSize;
 
-  public KafkaRecordWriter(SyncProducer producer, String topic, int queueSize)
+  public KafkaRecordWriter(Producer<Integer, Message> producer, String topic, int queueSize)
   {
     this.producer = producer;
     this.topic = topic;
@@ -49,8 +47,7 @@ public class KafkaRecordWriter<W extends
   protected void sendMsgList()
   {
     if (msgList.size() > 0) {
-      ByteBufferMessageSet msgSet = new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, msgList);
-      producer.send(topic, msgSet);
+      producer.send(msgList);
       msgList.clear();
       totalSize = 0;
     }
@@ -60,10 +57,11 @@ public class KafkaRecordWriter<W extends
   public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
   {
     Message msg = new Message(value.getBytes());
-    msgList.add(msg);
+    msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
     totalSize += msg.size();
 
-    if (totalSize > queueSize)
+    // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
+    if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE)
       sendMsgList();
   }
 

Modified: incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java Thu May 31 01:51:23 2012
@@ -16,9 +16,12 @@
  */
 package kafka.bridge.pig;
 
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import kafka.bridge.hadoop.KafkaOutputFormat;
 import kafka.bridge.hadoop.KafkaRecordWriter;
-
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Encoder;
 import org.apache.hadoop.fs.Path;
@@ -33,10 +36,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
 import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
 public class AvroKafkaStorage extends StoreFunc
 {
   protected KafkaRecordWriter writer;

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Thu May 31 01:51:23 2012
@@ -17,8 +17,6 @@
 
 package kafka
 
-import consumer.ConsumerConfig
-import producer.ProducerConfig
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
 import org.apache.log4j.jmx.LoggerDynamicMBean
@@ -30,8 +28,8 @@ object Kafka extends Logging {
     import org.apache.log4j.Logger
     Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
 
-    if (!List(1, 3).contains(args.length)) {
-      println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName()))
+    if (args.length != 1) {
+      println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
       System.exit(1)
     }
   
@@ -39,14 +37,7 @@ object Kafka extends Logging {
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
 
-      val kafkaServerStartble = args.length match {
-        case 3 =>
-          val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1)))
-          val producerConfig = new ProducerConfig(Utils.loadProps(args(2)))
-          new KafkaServerStartable(serverConfig, consumerConfig, producerConfig)
-        case 1 =>
-          new KafkaServerStartable(serverConfig)
-      }
+      val kafkaServerStartble = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c
       Runtime.getRuntime().addShutdownHook(new Thread() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu May 31 01:51:23 2012
@@ -36,11 +36,19 @@ object ConsoleConsumer extends Logging {
 
   def main(args: Array[String]) {
     val parser = new OptionParser
-    val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.")
+    val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + 
+    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
+                             .withRequiredArg
+                             .describedAs("whitelist")
+                             .ofType(classOf[String])
+    val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
+                             .withRequiredArg
+                             .describedAs("blacklist")
+                             .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
                                       "Multiple URLS can be given to allow fail-over.")
                            .withRequiredArg
                            .describedAs("urls")
@@ -90,8 +98,20 @@ object ConsoleConsumer extends Logging {
         "skip it instead of halt.")
 
     val options: OptionSet = tryParse(parser, args)
-    checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
+    Utils.checkRequiredArgs(parser, options, zkConnectOpt)
     
+    val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+    if (topicOrFilterOpt.size != 1) {
+      error("Exactly one of whitelist/blacklist/topic is required.")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    val topicArg = options.valueOf(topicOrFilterOpt.head)
+    val filterSpec = if (options.has(blacklistOpt))
+      new Blacklist(topicArg)
+    else
+      new Whitelist(topicArg)
+
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
@@ -104,7 +124,6 @@ object ConsoleConsumer extends Logging {
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     
-    val topic = options.valueOf(topicIdOpt)
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
     
@@ -123,21 +142,20 @@ object ConsoleConsumer extends Logging {
           tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
       }
     })
-    
-    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
-    val iter =
-      if(maxMessages >= 0)
-        stream.slice(0, maxMessages)
-      else
-        stream
+
+    val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+    val iter = if(maxMessages >= 0)
+      stream.slice(0, maxMessages)
+    else
+      stream
 
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
 
     try {
-      for(message <- iter) {
+      for(messageAndTopic <- iter) {
         try {
-          formatter.writeTo(message, System.out)
+          formatter.writeTo(messageAndTopic.message, System.out)
         } catch {
           case e =>
             if (skipMessageOnError)
@@ -173,16 +191,6 @@ object ConsoleConsumer extends Logging {
     }
   }
   
-  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
-    for(arg <- required) {
-      if(!options.has(arg)) {
-        error("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-  }
-  
   def tryParseFormatterArgs(args: Iterable[String]): Properties = {
     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
     if(!splits.forall(_.length == 2)) {
@@ -210,9 +218,19 @@ object ConsoleConsumer extends Logging {
   }
 
   class ChecksumMessageFormatter extends MessageFormatter {
+    private var topicStr: String = _
+    
+    override def init(props: Properties) {
+      topicStr = props.getProperty("topic")
+      if (topicStr != null) 
+        topicStr = topicStr + "-"
+      else
+        topicStr = ""
+    }
+    
     def writeTo(message: Message, output: PrintStream) {
       val chksum = message.checksum
-      output.println("checksum:" + chksum)
+      output.println(topicStr + "checksum:" + chksum)
     }
   }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Thu May 31 01:51:23 2012
@@ -20,7 +20,6 @@ package kafka.consumer
 import java.util.Properties
 import kafka.utils.{ZKConfig, Utils}
 import kafka.api.OffsetRequest
-import kafka.common.InvalidConfigException
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
@@ -29,7 +28,7 @@ object ConsumerConfig {
   val DefaultFetcherBackoffMs = 1000
   val AutoCommit = true
   val AutoCommitInterval = 10 * 1000
-  val MaxQueuedChunks = 100
+  val MaxQueuedChunks = 10
   val MaxRebalanceRetries = 4
   val AutoOffsetReset = OffsetRequest.SmallestTimeString
   val ConsumerTimeoutMs = -1
@@ -93,20 +92,10 @@ class ConsumerConfig(props: Properties) 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
 
-  /** Whitelist of topics for this mirror's embedded consumer to consume. At
-   *  most one of whitelist/blacklist may be specified. */
-  val mirrorTopicsWhitelist = Utils.getString(
-    props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
- 
-  /** Topics to skip mirroring. At most one of whitelist/blacklist may be
-   *  specified */
-  val mirrorTopicsBlackList = Utils.getString(
-    props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
-
-  if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
-      throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
-
-  val mirrorConsumerNumThreads = Utils.getInt(
-    props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
+  /** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
+   *  Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
+   *  overhead of decompression.
+   *  */
+  val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Thu May 31 01:51:23 2012
@@ -29,12 +29,28 @@ trait ConsumerConnector {
    *  Create a list of MessageStreams for each topic.
    *
    *  @param topicCountMap  a map of (topic, #streams) pair
-   *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
-   *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
+   *  @param decoder Decoder to decode each Message to type T
+   *  @return a map of (topic, list of  KafkaStream) pairs.
+   *          The number of items in the list is #streams. Each stream supports
+   *          an iterator over message/metadata pairs.
    */
   def createMessageStreams[T](topicCountMap: Map[String,Int],
                               decoder: Decoder[T] = new DefaultDecoder)
-    : Map[String,List[KafkaMessageStream[T]]]
+    : Map[String,List[KafkaStream[T]]]
+
+  /**
+   *  Create a list of message streams for all topics that match a given filter.
+   *
+   *  @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
+   *  @param numStreams Number of streams to return
+   *  @param decoder Decoder to decode each Message to type T
+   *  @return a list of KafkaStream each of which provides an
+   *          iterator over message/metadata pairs over allowed topics.
+   */
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
+                                      numStreams: Int = 1,
+                                      decoder: Decoder[T] = new DefaultDecoder)
+    : Seq[KafkaStream[T]]
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu May 31 01:51:23 2012
@@ -19,36 +19,38 @@ package kafka.consumer
 
 import kafka.utils.{IteratorTemplate, Logging}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.message.MessageAndOffset
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference
+import kafka.message.{MessageAndOffset, MessageAndMetadata}
+
 
 /**
  * An iterator that blocks until a value can be read from the supplied queue.
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
  *
  */
-class ConsumerIterator[T](private val topic: String,
-                          private val channel: BlockingQueue[FetchedDataChunk],
+class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
-                          private val decoder: Decoder[T])
-  extends IteratorTemplate[T] with Logging {
+                          private val decoder: Decoder[T],
+                          val enableShallowIterator: Boolean)
+  extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
   private var currentTopicInfo:PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
 
-  override def next(): T = {
-    val decodedMessage = super.next()
+  override def next(): MessageAndMetadata[T] = {
+    val item = super.next()
     if(consumedOffset < 0)
       throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
     currentTopicInfo.resetConsumeOffset(consumedOffset)
-    trace("Setting consumed offset to %d".format(consumedOffset))
+    val topic = currentTopicInfo.topic
+    trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
     ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
-    decodedMessage
+    item
   }
 
-  protected def makeNext(): T = {
+  protected def makeNext(): MessageAndMetadata[T] = {
     var currentDataChunk: FetchedDataChunk = null
     // if we don't have an iterator, get one
     var localCurrent = current.get()
@@ -74,16 +76,18 @@ class ConsumerIterator[T](private val to
                         .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
         }
-        localCurrent = currentDataChunk.messages.iterator
+        localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
+                       else currentDataChunk.messages.iterator
         current.set(localCurrent)
       }
     }
     val item = localCurrent.next()
     consumedOffset = item.offset
-    decoder.toEvent(item.message)
+
+    new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
   }
 
-  def clearCurrentChunk() = {
+  def clearCurrentChunk() {
     try {
       info("Clearing the current data chunk for this consumer iterator")
       current.set(null)
@@ -92,3 +96,4 @@ class ConsumerIterator[T](private val to
 }
 
 class ConsumerTimeoutException() extends RuntimeException()
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala Thu May 31 01:51:23 2012
@@ -42,24 +42,24 @@ private [consumer] class Fetcher(val con
     fetcherThreads = EMPTY_FETCHER_THREADS
   }
 
-  def clearFetcherQueues[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+  def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
                             queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
-                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+                            messageStreams: Map[String,List[KafkaStream[_]]]) {
 
     // Clear all but the currently iterated upon chunk in the consumer thread's queue
     queuesTobeCleared.foreach(_.clear)
     info("Cleared all relevant queues for this fetcher")
 
     // Also clear the currently iterated upon chunk in the consumer threads
-    if(kafkaMessageStreams != null)
-       kafkaMessageStreams.foreach(_._2.foreach(s => s.clear()))
+    if(messageStreams != null)
+       messageStreams.foreach(_._2.foreach(s => s.clear()))
 
     info("Cleared the data chunks in all the consumer message iterators")
 
   }
 
-  def startConnections[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
-                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+  def startConnections(topicInfos: Iterable[PartitionTopicInfo],
+                       cluster: Cluster) {
     if (topicInfos == null)
       return
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Thu May 31 01:51:23 2012
@@ -19,33 +19,17 @@ package kafka.consumer
 
 import scala.collection._
 import scala.util.parsing.json.JSON
-import kafka.utils.Logging
+import org.I0Itec.zkclient.ZkClient
+import java.util.regex.Pattern
+import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
 
-private[kafka] object TopicCount extends Logging {
-  val myConversionFunc = {input : String => input.toInt}
-  JSON.globalNumberParser = myConversionFunc
-
-  def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = {
-    var topMap : Map[String,Int] = null
-    try {
-      JSON.parseFull(jsonString) match {
-        case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
-        case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
-      }
-    } catch {
-      case e =>
-        error("error parsing consumer json string " + jsonString, e)
-        throw e
-    }
-
-    new TopicCount(consumerIdSting, topMap)
-  }
-
-}
-
-private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
 
-  def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = {
+private[kafka] trait TopicCount {
+  def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
+  def dbString: String
+  
+  protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
+                                            topicCountMap: Map[String,  Int]) = {
     val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
     for ((topic, nConsumers) <- topicCountMap) {
       val consumerSet = new mutable.HashSet[String]
@@ -56,11 +40,96 @@ private[kafka] class TopicCount(val cons
     }
     consumerThreadIdsPerTopicMap
   }
+}
+
+private[kafka] object TopicCount extends Logging {
+
+  /*
+   * Example of whitelist topic count stored in ZooKeeper:
+   * Topics with whitetopic as prefix, and four streams: *4*whitetopic.*
+   *
+   * Example of blacklist topic count stored in ZooKeeper:
+   * Topics with blacktopic as prefix, and four streams: !4!blacktopic.*
+   */
+
+  val WHITELIST_MARKER = "*"
+  val BLACKLIST_MARKER = "!"
+  private val WHITELIST_PATTERN =
+    Pattern.compile("""\*(\p{Digit}+)\*(.*)""")
+  private val BLACKLIST_PATTERN =
+    Pattern.compile("""!(\p{Digit}+)!(.*)""")
+
+  val myConversionFunc = {input : String => input.toInt}
+  JSON.globalNumberParser = myConversionFunc
+
+  def constructTopicCount(group: String,
+                          consumerId: String,
+                          zkClient: ZkClient) : TopicCount = {
+    val dirs = new ZKGroupDirs(group)
+    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+    val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
+    val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
+
+    if (hasWhitelist || hasBlacklist)
+      info("Constructing topic count for %s from %s using %s as pattern."
+        .format(consumerId, topicCountString,
+          if (hasWhitelist) WHITELIST_PATTERN else BLACKLIST_PATTERN))
+
+    if (hasWhitelist || hasBlacklist) {
+      val matcher = if (hasWhitelist)
+        WHITELIST_PATTERN.matcher(topicCountString)
+      else
+        BLACKLIST_PATTERN.matcher(topicCountString)
+      require(matcher.matches())
+      val numStreams = matcher.group(1).toInt
+      val regex = matcher.group(2)
+      val filter = if (hasWhitelist)
+        new Whitelist(regex)
+      else
+        new Blacklist(regex)
+
+      new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
+    }
+    else {
+      var topMap : Map[String,Int] = null
+      try {
+        JSON.parseFull(topicCountString) match {
+          case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
+          case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
+        }
+      }
+      catch {
+        case e =>
+          error("error parsing consumer json string " + topicCountString, e)
+          throw e
+      }
+
+      new StaticTopicCount(consumerId, topMap)
+    }
+  }
+
+  def constructTopicCount(consumerIdString: String, topicCount: Map[String,  Int]) =
+    new StaticTopicCount(consumerIdString, topicCount)
+
+  def constructTopicCount(consumerIdString: String,
+                          filter: TopicFilter,
+                          numStreams: Int,
+                          zkClient: ZkClient) =
+    new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
+
+}
+
+private[kafka] class StaticTopicCount(val consumerIdString: String,
+                                val topicCountMap: Map[String, Int])
+                                extends TopicCount {
+
+  def getConsumerThreadIdsPerTopic =
+    makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
 
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
-      case n: TopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
+      case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
       case _ => false
     }
   }
@@ -71,7 +140,7 @@ private[kafka] class TopicCount(val cons
    *    "topic2" : 4
    *  }
    */
-  def toJsonString() : String = {
+  def dbString = {
     val builder = new StringBuilder
     builder.append("{ ")
     var i = 0
@@ -82,6 +151,29 @@ private[kafka] class TopicCount(val cons
       i += 1
     }
     builder.append(" }")
-    builder.toString
+    builder.toString()
+  }
+}
+
+private[kafka] class WildcardTopicCount(zkClient: ZkClient,
+                                        consumerIdString: String,
+                                        topicFilter: TopicFilter,
+                                        numStreams: Int) extends TopicCount {
+  def getConsumerThreadIdsPerTopic = {
+    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(
+      zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
+    makeConsumerThreadIdsPerTopic(consumerIdString,
+                                  Map(wildcardTopics.map((_, numStreams)): _*))
+  }
+
+  def dbString = {
+    val marker = topicFilter match {
+      case wl: Whitelist => TopicCount.WHITELIST_MARKER
+      case bl: Blacklist => TopicCount.BLACKLIST_MARKER
+    }
+
+    "%s%d%s%s".format(marker, numStreams, marker, topicFilter.regex)
   }
+
 }
+