You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:03 UTC

[4/37] git commit: KAFKA-713 Update Hadoop producer for Kafka 0.8 changes; reviewed by Neha Narkhede

KAFKA-713 Update Hadoop producer for Kafka 0.8 changes; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/295734f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/295734f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/295734f6

Branch: refs/heads/trunk
Commit: 295734f6abb69d0c184faa0085ca198040b2adbf
Parents: 2ae0690
Author: Sam Shah <sh...@umich.edu>
Authored: Fri Feb 1 10:46:00 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 1 10:46:02 2013 -0800

----------------------------------------------------------------------
 contrib/hadoop-producer/README.md                  |  119 ++++-----------
 .../java/kafka/bridge/examples/TextPublisher.java  |   14 +-
 .../kafka/bridge/hadoop/KafkaOutputFormat.java     |  103 +++++--------
 .../kafka/bridge/hadoop/KafkaRecordWriter.java     |   35 +++--
 .../java/kafka/bridge/pig/AvroKafkaStorage.java    |    8 +-
 project/build/KafkaProject.scala                   |    2 +-
 6 files changed, 108 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md
index 6e57fde..1bd3721 100644
--- a/contrib/hadoop-producer/README.md
+++ b/contrib/hadoop-producer/README.md
@@ -4,16 +4,20 @@ Hadoop to Kafka Bridge
 What's new?
 -----------
 
-* Now supports Kafka's software load balancer (Kafka URIs are specified with
-  kafka+zk as the scheme, as described below)
-* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy
-  SyncProducer.
+* Kafka 0.8 support
+  * No more ZK-based load balancing (backwards incompatible change)
+* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a
+  key in the output committer of your job. The Pig StoreFunc doesn't support
+  semantic partitioning.
+* Config parameters are now the same as the Kafka producer, just prepended with
+  kafka.output (e.g., kafka.output.max.message.size). This is a backwards
+  incompatible change.
 
 What is it?
 -----------
 
 The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There
-are two possible mechanisms, varying from easy to difficult:  writing a Pig
+are two possible mechanisms, varying from easy to difficult: writing a Pig
 script and writing messages in Avro format, or rolling your own job using the
 Kafka `OutputFormat`. 
 
@@ -25,10 +29,8 @@ multiple times in the same push.
 How do I use it?
 ----------------
 
-With this bridge, Kafka topics are URIs and are specified in one of two
-formats: `kafka+zk://<zk-path>#<kafka-topic>`, which uses the software load
-balancer, or the legacy `kafka://<kafka-server>/<kafka-topic>` to connect to a
-specific Kafka broker.
+With this bridge, Kafka topics are URIs and are specified as URIs of the form
+`kafka://<kafka-server>/<kafka-topic>` to connect to a specific Kafka broker.
 
 ### Pig ###
 
@@ -37,19 +39,17 @@ row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage`
 with the Avro schema as its first argument. You'll need to register the
 appropriate Kafka JARs. Here is what an example Pig script looks like:
 
-    REGISTER hadoop-producer_2.8.0-0.7.0.jar;
+    REGISTER hadoop-producer_2.8.0-0.8.0.jar;
     REGISTER avro-1.4.0.jar;
     REGISTER piggybank.jar;
-    REGISTER kafka-0.7.0.jar;
+    REGISTER kafka-0.8.0.jar;
     REGISTER jackson-core-asl-1.5.5.jar;
     REGISTER jackson-mapper-asl-1.5.5.jar;
-    REGISTER zkclient-20110412.jar;
-    REGISTER zookeeper-3.3.4.jar;
     REGISTER scala-library.jar;
 
-    member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
+    member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray);
     names = FOREACH member_info GENERATE name;
-    STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
+    STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
 
 That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
 from Pig's data model to the specified Avro schema.
@@ -58,8 +58,8 @@ Further, multi-store is possible with KafkaStorage, so you can easily write to
 multiple topics and brokers in the same job:
 
     SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
-    STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema');
-    STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema');
+    STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
+    STORE others INTO 'kafka://my-broker2:9092/others' USING AvroKafkaStorage('$schema');
 
 ### KafkaOutputFormat ###
 
@@ -68,80 +68,27 @@ uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e.,
 BytesWritable). This is a lower-level method of publishing data, as it allows
 you to precisely control output.
 
-Here is an example that publishes some input text. With KafkaOutputFormat, the
-key is a NullWritable and is ignored; only values are published. Speculative
-execution is turned off by the OutputFormat.
-
-    import kafka.bridge.hadoop.KafkaOutputFormat;
-    
-    import org.apache.hadoop.fs.Path;
-    import org.apache.hadoop.io.BytesWritable;
-    import org.apache.hadoop.io.NullWritable;
-    import org.apache.hadoop.io.Text;
-    import org.apache.hadoop.mapreduce.Job;
-    import org.apache.hadoop.mapreduce.Mapper;
-    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-    
-    import java.io.IOException;
-    
-    public class TextPublisher
-    {
-      public static void main(String[] args) throws Exception
-      {
-        if (args.length != 2) {
-          System.err.println("usage: <input path> <kafka output url>");
-          return;
-        }
-    
-        Job job = new Job();
-    
-        job.setJarByClass(TextPublisher.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(BytesWritable.class);
-        job.setInputFormatClass(TextInputFormat.class);
-        job.setOutputFormatClass(KafkaOutputFormat.class);
-    
-        job.setMapperClass(TheMapper.class);
-        job.setNumReduceTasks(0);
-    
-        FileInputFormat.addInputPath(job, new Path(args[0]));
-        KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
-    
-        if (!job.waitForCompletion(true)) {
-          throw new RuntimeException("Job failed!");
-        }
-      }
-    
-      public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
-      {
-        @Override
-        protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
-        {
-          context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
-        }
-      }
-    }
+Included is an example that publishes some input text line-by-line to a topic.
+With KafkaOutputFormat, the key can be a null, where it is ignored by the
+producer (random partitioning), or any object for semantic partitioning of the
+stream (with an appropriate Kafka partitioner set). Speculative execution is
+turned off by the OutputFormat.
 
 What can I tune?
 ----------------
 
-Normally, you needn't change any of these parameters:
-
-* kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka
+* kafka.output.queue.size: Bytes to queue in memory before pushing to the Kafka
   producer (i.e., the batch size). Default is 10*1024*1024 (10MB).
-* kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka
-  producer docs). Default is 30*1000 (30s).
-* kafka.output.reconnect_timeout: Milliseconds to wait until attempting
-  reconnection (see Kafka producer docs). Default is 1000 (1s).
-* kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer
-  docs). Default is 64*1024 (64KB). 
-* kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer
-  docs). Default is 1024*1024 (1MB).
-* kafka.output.compression_codec: The compression codec to use (see Kafka producer
-  docs). Default is 0 (no compression).
+
+Any of Kafka's producer parameters can be changed by prefixing them with
+"kafka.output" in one's job configuration. For example, to change the
+compression codec, one would add the "kafka.output.compression.codec" parameter
+(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no
+compression). 
 
 For easier debugging, the above values as well as the Kafka broker information
-(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic),
-and the schema (kafka.output.schema) are injected into the job's configuration.
+(kafka.broker.list), the topic (kafka.output.topic), and the schema
+(kafka.output.schema) are injected into the job's configuration. By default,
+the Hadoop producer uses Kafka's sync producer as asynchronous operation
+doesn't make sense in the batch Hadoop case.
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
index 5acbcee..d447b1d 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
@@ -16,18 +16,18 @@
  */
 package kafka.bridge.examples;
 
-
 import java.io.IOException;
 import kafka.bridge.hadoop.KafkaOutputFormat;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
+/**
+ * Publish a text file line by line to a Kafka topic
+ */
 public class TextPublisher
 {
   public static void main(String[] args) throws Exception
@@ -40,8 +40,6 @@ public class TextPublisher
     Job job = new Job();
 
     job.setJarByClass(TextPublisher.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(BytesWritable.class);
     job.setInputFormatClass(TextInputFormat.class);
     job.setOutputFormatClass(KafkaOutputFormat.class);
 
@@ -56,12 +54,12 @@ public class TextPublisher
     }
   }
 
-  public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
+  public static class TheMapper extends Mapper<Object, Text, Object, Object>
   {
     @Override
-    protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
+    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
     {
-      context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
+      context.write(null, value.getBytes());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 2fd2035..aa1f944 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -16,19 +16,16 @@
  */
 package kafka.bridge.hadoop;
 
-
 import java.io.IOException;
 import java.net.URI;
-import java.util.Properties;
+import java.util.*;
 
 import kafka.common.KafkaException;
 import kafka.javaapi.producer.Producer;
-import kafka.message.Message;
 import kafka.producer.ProducerConfig;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -38,26 +35,26 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.log4j.Logger;
 
-public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
+public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
 {
   private Logger log = Logger.getLogger(KafkaOutputFormat.class);
 
   public static final String KAFKA_URL = "kafka.output.url";
-  /** Bytes to buffer before the OutputFormat does a send */
+  /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */
   public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
 
-  /** Default value for Kafka's connect.timeout.ms */
-  public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000;
-  /** Default value for Kafka's reconnect.interval*/
-  public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000;
-  /** Default value for Kafka's buffer.size */
-  public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024;
-  /** Default value for Kafka's max.message.size */
-  public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024;
-  /** Default value for Kafka's producer.type */
-  public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync";
-  /** Default value for Kafka's compression.codec */
-  public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0;
+  public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
+  private static final Map<String, String> kafkaConfigMap;
+  static {
+    Map<String, String> cMap = new HashMap<String, String>();
+
+    // default Hadoop producer configs
+    cMap.put("producer.type",       "sync");
+    cMap.put("send.buffer.bytes",   Integer.toString(64*1024));
+    cMap.put("compression.codec",   Integer.toString(1));
+
+    kafkaConfigMap = Collections.unmodifiableMap(cMap);
+  }
 
   public KafkaOutputFormat()
   {
@@ -91,7 +88,7 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
   }
 
   @Override
-  public RecordWriter<NullWritable, W> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
   {
     Path outputPath = getOutputPath(context);
     if (outputPath == null)
@@ -102,58 +99,44 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
     Properties props = new Properties();
     String topic;
 
-    final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE);
-    final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT);
-    final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL);
-    final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE);
-    final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE);
-    final String producerType = job.get("kafka.output.producer_type", KAFKA_PRODUCER_PRODUCER_TYPE);
-    final int compressionCodec = job.getInt("kafka.output.compression_codec", KAFKA_PRODUCER_COMPRESSION_CODEC);
-
-    job.setInt("kafka.output.queue_size", queueSize);
-    job.setInt("kafka.output.connect_timeout", timeout);
-    job.setInt("kafka.output.reconnect_interval", interval);
-    job.setInt("kafka.output.bufsize", bufSize);
-    job.setInt("kafka.output.max_msgsize", maxSize);
-    job.set("kafka.output.producer_type", producerType);
-    job.setInt("kafka.output.compression_codec", compressionCodec);
-
-    props.setProperty("producer.type", producerType);
-    props.setProperty("send.buffer.bytes", Integer.toString(bufSize));
-    props.setProperty("connect.timeout.ms", Integer.toString(timeout));
-    props.setProperty("reconnect.interval", Integer.toString(interval));
-    props.setProperty("compression.codec", Integer.toString(compressionCodec));
+    props.putAll(kafkaConfigMap);                       // inject default configuration
+    for (Map.Entry<String, String> m : job) {           // handle any overrides
+      if (!m.getKey().startsWith(KAFKA_CONFIG_PREFIX))
+        continue;
+      if (m.getKey().equals(KAFKA_URL))
+        continue;
+
+      String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1);
+      props.setProperty(kafkaKeyName, m.getValue());    // set Kafka producer property
+    }
+
+    // inject Kafka producer props back into jobconf for easier debugging
+    for (Map.Entry<Object, Object> m : props.entrySet()) {
+      job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), m.getValue().toString());
+    }
+
+    // KafkaOutputFormat specific parameters
+    final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.size", KAFKA_QUEUE_SIZE);
+    job.setInt(KAFKA_CONFIG_PREFIX + ".queue.size", queueSize);
 
     if (uri.getScheme().equals("kafka")) {
-      // using the legacy direct broker list
+      // using the direct broker list
       // URL: kafka://<kafka host>/<topic>
       // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
-
-      // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique
-      // (KAFKA-258 will remove the broker_id requirement)
-      StringBuilder brokerListBuilder = new StringBuilder();
-      String delim = "";
-      int brokerId = 0;
-      for (String serverPort : uri.getAuthority().split(",")) {
-        brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort));
-        delim = ",";
-        brokerId++;
-      }
-      String brokerList = brokerListBuilder.toString();
-
+      String brokerList = uri.getAuthority();
       props.setProperty("broker.list", brokerList);
-      job.set("kafka.broker.list", brokerList);
+      job.set(KAFKA_CONFIG_PREFIX + ".broker.list", brokerList);
 
       if (uri.getPath() == null || uri.getPath().length() <= 1)
         throw new KafkaException("no topic specified in kafka uri");
 
-      topic = uri.getPath().substring(1);             // ignore the initial '/' in the path
-      job.set("kafka.output.topic", topic);
+      topic = uri.getPath().substring(1);               // ignore the initial '/' in the path
+      job.set(KAFKA_CONFIG_PREFIX + ".topic", topic);
       log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
     } else
       throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
 
-    Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
-    return new KafkaRecordWriter<W>(producer, topic, queueSize);
+    Producer<Object, byte[]> producer = new Producer<Object, byte[]>(new ProducerConfig(props));
+    return new KafkaRecordWriter<K, V>(producer, topic, queueSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index 8c84786..a381ccd 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -16,49 +16,58 @@
  */
 package kafka.bridge.hadoop;
 
-
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
-import kafka.message.Message;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
+public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
 {
-  protected Producer<Integer, Message> producer;
+  protected Producer<Object, byte[]> producer;
   protected String topic;
 
-  protected List<KeyedMessage<Integer, Message>> msgList = new LinkedList<KeyedMessage<Integer, Message>>();
+  protected List<KeyedMessage<Object, byte[]>> msgList = new LinkedList<KeyedMessage<Object, byte[]>>();
   protected int totalSize = 0;
   protected int queueSize;
 
-  public KafkaRecordWriter(Producer<Integer, Message> producer, String topic, int queueSize)
+  public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueSize)
   {
     this.producer = producer;
     this.topic = topic;
     this.queueSize = queueSize;
   }
 
-  protected void sendMsgList()
+  protected void sendMsgList() throws IOException
   {
     if (msgList.size() > 0) {
-      producer.send(msgList);
+      try {
+        producer.send(msgList);
+      }
+      catch (Exception e) {
+        throw new IOException(e);           // all Kafka exceptions become IOExceptions
+      }
       msgList.clear();
       totalSize = 0;
     }
   }
 
   @Override
-  public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
+  public void write(K key, V value) throws IOException, InterruptedException
   {
-    Message msg = new Message(value.getBytes());
-    msgList.add(new KeyedMessage<Integer, Message>(this.topic, msg));
-    totalSize += msg.size();
+    byte[] valBytes;
+    if (value instanceof byte[])
+      valBytes = (byte[]) value;
+    else if (value instanceof BytesWritable)
+      valBytes = ((BytesWritable) value).getBytes();
+    else
+      throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
+
+    msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
+    totalSize += valBytes.length;
 
     // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
     if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
index faa1950..d24a85a 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
@@ -16,7 +16,6 @@
  */
 package kafka.bridge.pig;
 
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -25,8 +24,6 @@ import kafka.bridge.hadoop.KafkaRecordWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Encoder;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -38,7 +35,7 @@ import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
 
 public class AvroKafkaStorage extends StoreFunc
 {
-  protected KafkaRecordWriter writer;
+  protected KafkaRecordWriter<Object, byte[]> writer;
   protected org.apache.avro.Schema avroSchema;
   protected PigAvroDatumWriter datumWriter;
   protected Encoder encoder;
@@ -68,6 +65,7 @@ public class AvroKafkaStorage extends StoreFunc
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void prepareToWrite(RecordWriter writer) throws IOException
   {
     if (this.avroSchema == null)
@@ -108,7 +106,7 @@ public class AvroKafkaStorage extends StoreFunc
     this.encoder.flush();
 
     try {
-      this.writer.write(NullWritable.get(), new BytesWritable(this.os.toByteArray()));
+      this.writer.write(null, this.os.toByteArray());
     }
     catch (InterruptedException e) {
       throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/project/build/KafkaProject.scala
----------------------------------------------------------------------
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
index 48d1930..fac723a 100644
--- a/project/build/KafkaProject.scala
+++ b/project/build/KafkaProject.scala
@@ -239,7 +239,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
          <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2">
            <exclude module="junit"/>
          </dependency>
-         <dependency org="org.apache.pig" name="pig" rev="0.8.0">
+         <dependency org="org.apache.pig" name="pig" rev="0.10.0">
            <exclude module="junit"/>
          </dependency>
        </dependencies>