You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/07 00:10:55 UTC

svn commit: r1241249 - in /incubator/kafka/trunk/contrib/hadoop-producer: README.md src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java

Author: nehanarkhede
Date: Mon Feb  6 23:10:54 2012
New Revision: 1241249

URL: http://svn.apache.org/viewvc?rev=1241249&view=rev
Log:
KAFKA-257 Hadoop producer should use software load balancer; patched by Sam Shah; reviewed by nehanarkhede

Modified:
    incubator/kafka/trunk/contrib/hadoop-producer/README.md
    incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
    incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java

Modified: incubator/kafka/trunk/contrib/hadoop-producer/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/README.md?rev=1241249&r1=1241248&r2=1241249&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/README.md (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/README.md Mon Feb  6 23:10:54 2012
@@ -1,6 +1,14 @@
 Hadoop to Kafka Bridge
 ======================
 
+What's new?
+-----------
+
+* Now supports Kafka's software load balancer (Kafka URIs are specified with
+  kafka+zk as the scheme, as described below)
+* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy
+  SyncProducer.
+
 What is it?
 -----------
 
@@ -17,8 +25,10 @@ multiple times in the same push.
 How do I use it?
 ----------------
 
-With this bridge, Kafka topics are URIs and are specified as
-`kafka://<kafka-server>/<kafka-topic>`.
+With this bridge, Kafka topics are URIs and are specified in one of two
+formats: `kafka+zk://<zk-path>#<kafka-topic>`, which uses the software load
+balancer, or the legacy `kafka://<kafka-server>/<kafka-topic>` to connect to a
+specific Kafka broker.
 
 ### Pig ###
 
@@ -27,17 +37,19 @@ row. To push data via Kafka, store to th
 with the Avro schema as its first argument. You'll need to register the
 appropriate Kafka JARs. Here is what an example Pig script looks like:
 
-    REGISTER hadoop-kafka-bridge-0.5.2.jar;
+    REGISTER hadoop-producer_2.8.0-0.7.0.jar;
     REGISTER avro-1.4.0.jar;
     REGISTER piggybank.jar;
-    REGISTER kafka-0.5.2.jar;
+    REGISTER kafka-0.7.0.jar;
     REGISTER jackson-core-asl-1.5.5.jar;
     REGISTER jackson-mapper-asl-1.5.5.jar;
+    REGISTER zkclient-20110412.jar;
+    REGISTER zookeeper-3.3.4.jar;
     REGISTER scala-library.jar;
 
     member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
     names = FOREACH member_info GENERATE name;
-    STORE member_info INTO 'kafka://my-broker:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
+    STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
 
 That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
 from Pig's data model to the specified Avro schema.
@@ -46,8 +58,8 @@ Further, multi-store is possible with Ka
 multiple topics and brokers in the same job:
 
     SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
-    STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
-    STORE others INTO 'kafka://my-broker:9092/others' USING AvroKafkaStorage('$schema');
+    STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema');
+    STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema');
 
 ### KafkaOutputFormat ###
 
@@ -126,9 +138,10 @@ Normally, you needn't change any of thes
   docs). Default is 64*1024 (64KB). 
 * kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer
   docs). Default is 1024*1024 (1MB).
+* kafka.output.compression_codec: The compression codec to use (see Kafka producer
+  docs). Default is 0 (no compression).
 
-For easier debugging, the above values as well as the Kafka URI
-(kafka.output.url), the output server (kafka.output.server), the topic
-(kafka.output.topic), and the schema (kafka.output.schema) are injected into
-the job's configuration.
+For easier debugging, the above values as well as the Kafka broker information
+(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic),
+and the schema (kafka.output.schema) are injected into the job's configuration.
 

Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java?rev=1241249&r1=1241248&r2=1241249&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java Mon Feb  6 23:10:54 2012
@@ -18,27 +18,42 @@ package kafka.bridge.hadoop;
 
 import java.util.Properties;
 
-import kafka.javaapi.producer.SyncProducer;
-import kafka.producer.SyncProducerConfig;
+import kafka.javaapi.producer.Producer;
+import kafka.message.Message;
+import kafka.producer.ProducerConfig;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.net.URI;
 
 public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
 {
+  private Logger log = Logger.getLogger(KafkaOutputFormat.class);
+
   public static final String KAFKA_URL = "kafka.output.url";
+  /** Bytes to buffer before the OutputFormat does a send */
+  public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+
+  /** Default value for Kafka's connect.timeout.ms */
   public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000;
+  /** Default value for Kafka's reconnect.interval*/
   public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000;
+  /** Default value for Kafka's buffer.size */
   public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024;
+  /** Default value for Kafka's max.message.size */
   public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024;
-  public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+  /** Default value for Kafka's producer.type */
+  public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync";
+  /** Default value for Kafka's compression.codec */
+  public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0;
 
   public KafkaOutputFormat()
   {
@@ -77,40 +92,80 @@ public class KafkaOutputFormat<W extends
     Path outputPath = getOutputPath(context);
     if (outputPath == null)
       throw new IllegalArgumentException("no kafka output url specified");
-    URI uri = outputPath.toUri();
+    URI uri = URI.create(outputPath.toString());
     Configuration job = context.getConfiguration();
 
-    final String topic = uri.getPath().substring(1);        // ignore the initial '/' in the path
+    Properties props = new Properties();
+    String topic;
 
     final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE);
     final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT);
     final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL);
     final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE);
     final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE);
+    final String producerType = job.get("kafka.output.producer_type", KAFKA_PRODUCER_PRODUCER_TYPE);
+    final int compressionCodec = job.getInt("kafka.output.compression_codec", KAFKA_PRODUCER_COMPRESSION_CODEC);
 
-    job.set("kafka.output.server", String.format("%s:%d", uri.getHost(), uri.getPort()));
-    job.set("kafka.output.topic", topic);
     job.setInt("kafka.output.queue_size", queueSize);
     job.setInt("kafka.output.connect_timeout", timeout);
     job.setInt("kafka.output.reconnect_interval", interval);
     job.setInt("kafka.output.bufsize", bufSize);
     job.setInt("kafka.output.max_msgsize", maxSize);
+    job.set("kafka.output.producer_type", producerType);
+    job.setInt("kafka.output.compression_codec", compressionCodec);
 
-    if (uri.getHost().isEmpty())
-      throw new IllegalArgumentException("missing kafka server");
-    if (uri.getPath().isEmpty())
-      throw new IllegalArgumentException("missing kafka topic");
-
-    Properties props = new Properties();
-    props.setProperty("host", uri.getHost());
-    props.setProperty("port", Integer.toString(uri.getPort()));
+    props.setProperty("producer.type", producerType);
     props.setProperty("buffer.size", Integer.toString(bufSize));
     props.setProperty("connect.timeout.ms", Integer.toString(timeout));
     props.setProperty("reconnect.interval", Integer.toString(interval));
     props.setProperty("max.message.size", Integer.toString(maxSize));
+    props.setProperty("compression.codec", Integer.toString(compressionCodec));
 
-    SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
+    if (uri.getScheme().equals("kafka+zk")) {
+      // Software load balancer:
+      //  URL: kafka+zk://<zk connect path>#<kafka topic>
+      //  e.g. kafka+zk://kafka-zk:2181/kafka#foobar
+
+      String zkConnect = uri.getAuthority() + uri.getPath();
+
+      props.setProperty("zk.connect", zkConnect);
+      job.set("kafka.zk.connect", zkConnect);
+
+      topic = uri.getFragment();
+      if (topic == null)
+        throw new IllegalArgumentException("no topic specified in kafka uri fragment");
+
+      log.info(String.format("using kafka zk.connect %s (topic %s)", zkConnect, topic));
+    } else if (uri.getScheme().equals("kafka")) {
+      // using the legacy direct broker list
+      // URL: kafka://<kafka host>/<topic>
+      // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
+
+      // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique
+      // (KAFKA-258 will remove the broker_id requirement)
+      StringBuilder brokerListBuilder = new StringBuilder();
+      String delim = "";
+      int brokerId = 0;
+      for (String serverPort : uri.getAuthority().split(",")) {
+        brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort));
+        delim = ",";
+        brokerId++;
+      }
+      String brokerList = brokerListBuilder.toString();
+
+      props.setProperty("broker.list", brokerList);
+      job.set("kafka.broker.list", brokerList);
+
+      if (uri.getPath() == null || uri.getPath().length() <= 1)
+        throw new IllegalArgumentException("no topic specified in kafka uri");
+
+      topic = uri.getPath().substring(1);             // ignore the initial '/' in the path
+      job.set("kafka.output.topic", topic);
+      log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
+    } else
+      throw new IllegalArgumentException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)");
+
+    Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
     return new KafkaRecordWriter<W>(producer, topic, queueSize);
   }
 }
-

Modified: incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java?rev=1241249&r1=1241248&r2=1241249&view=diff
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (original)
+++ incubator/kafka/trunk/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java Mon Feb  6 23:10:54 2012
@@ -16,30 +16,29 @@
  */
 package kafka.bridge.hadoop;
 
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
 import kafka.message.Message;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.javaapi.producer.SyncProducer;
 
-import kafka.message.NoCompressionCodec;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
 {
-  protected SyncProducer producer;
+  protected Producer<Integer, Message> producer;
   protected String topic;
 
-  protected List<Message> msgList = new ArrayList<Message>();
+  protected List<ProducerData<Integer, Message>> msgList = new LinkedList<ProducerData<Integer, Message>>();
   protected int totalSize = 0;
   protected int queueSize;
 
-  public KafkaRecordWriter(SyncProducer producer, String topic, int queueSize)
+  public KafkaRecordWriter(Producer<Integer, Message> producer, String topic, int queueSize)
   {
     this.producer = producer;
     this.topic = topic;
@@ -49,8 +48,7 @@ public class KafkaRecordWriter<W extends
   protected void sendMsgList()
   {
     if (msgList.size() > 0) {
-      ByteBufferMessageSet msgSet = new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, msgList);
-      producer.send(topic, msgSet);
+      producer.send(msgList);
       msgList.clear();
       totalSize = 0;
     }
@@ -60,10 +58,11 @@ public class KafkaRecordWriter<W extends
   public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
   {
     Message msg = new Message(value.getBytes());
-    msgList.add(msg);
+    msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
     totalSize += msg.size();
 
-    if (totalSize > queueSize)
+    // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
+    if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE)
       sendMsgList();
   }