You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:03 UTC
[4/37] git commit: KAFKA-713 Update Hadoop producer for Kafka 0.8
changes; reviewed by Neha Narkhede
KAFKA-713 Update Hadoop producer for Kafka 0.8 changes; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/295734f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/295734f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/295734f6
Branch: refs/heads/trunk
Commit: 295734f6abb69d0c184faa0085ca198040b2adbf
Parents: 2ae0690
Author: Sam Shah <sh...@umich.edu>
Authored: Fri Feb 1 10:46:00 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 1 10:46:02 2013 -0800
----------------------------------------------------------------------
contrib/hadoop-producer/README.md | 119 ++++-----------
.../java/kafka/bridge/examples/TextPublisher.java | 14 +-
.../kafka/bridge/hadoop/KafkaOutputFormat.java | 103 +++++--------
.../kafka/bridge/hadoop/KafkaRecordWriter.java | 35 +++--
.../java/kafka/bridge/pig/AvroKafkaStorage.java | 8 +-
project/build/KafkaProject.scala | 2 +-
6 files changed, 108 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md
index 6e57fde..1bd3721 100644
--- a/contrib/hadoop-producer/README.md
+++ b/contrib/hadoop-producer/README.md
@@ -4,16 +4,20 @@ Hadoop to Kafka Bridge
What's new?
-----------
-* Now supports Kafka's software load balancer (Kafka URIs are specified with
- kafka+zk as the scheme, as described below)
-* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy
- SyncProducer.
+* Kafka 0.8 support
+ * No more ZK-based load balancing (backwards incompatible change)
+* Semantic partitioning is now supported in KafkaOutputFormat. Just specify a
+ key in the output committer of your job. The Pig StoreFunc doesn't support
+ semantic partitioning.
+* Config parameters are now the same as the Kafka producer, just prepended with
+ kafka.output (e.g., kafka.output.max.message.size). This is a backwards
+ incompatible change.
What is it?
-----------
The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There
-are two possible mechanisms, varying from easy to difficult: writing a Pig
+are two possible mechanisms, varying from easy to difficult: writing a Pig
script and writing messages in Avro format, or rolling your own job using the
Kafka `OutputFormat`.
@@ -25,10 +29,8 @@ multiple times in the same push.
How do I use it?
----------------
-With this bridge, Kafka topics are URIs and are specified in one of two
-formats: `kafka+zk://<zk-path>#<kafka-topic>`, which uses the software load
-balancer, or the legacy `kafka://<kafka-server>/<kafka-topic>` to connect to a
-specific Kafka broker.
+With this bridge, Kafka topics are URIs and are specified as URIs of the form
+`kafka://<kafka-server>/<kafka-topic>` to connect to a specific Kafka broker.
### Pig ###
@@ -37,19 +39,17 @@ row. To push data via Kafka, store to the Kafka URI using `AvroKafkaStorage`
with the Avro schema as its first argument. You'll need to register the
appropriate Kafka JARs. Here is what an example Pig script looks like:
- REGISTER hadoop-producer_2.8.0-0.7.0.jar;
+ REGISTER hadoop-producer_2.8.0-0.8.0.jar;
REGISTER avro-1.4.0.jar;
REGISTER piggybank.jar;
- REGISTER kafka-0.7.0.jar;
+ REGISTER kafka-0.8.0.jar;
REGISTER jackson-core-asl-1.5.5.jar;
REGISTER jackson-mapper-asl-1.5.5.jar;
- REGISTER zkclient-20110412.jar;
- REGISTER zookeeper-3.3.4.jar;
REGISTER scala-library.jar;
- member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
+ member_info = LOAD 'member_info.tsv' AS (member_id : int, name : chararray);
names = FOREACH member_info GENERATE name;
- STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
+ STORE member_info INTO 'kafka://my-kafka:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');
That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert
from Pig's data model to the specified Avro schema.
@@ -58,8 +58,8 @@ Further, multi-store is possible with KafkaStorage, so you can easily write to
multiple topics and brokers in the same job:
SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
- STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema');
- STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema');
+ STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
+ STORE others INTO 'kafka://my-broker2:9092/others' USING AvroKafkaStorage('$schema');
### KafkaOutputFormat ###
@@ -68,80 +68,27 @@ uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e.,
BytesWritable). This is a lower-level method of publishing data, as it allows
you to precisely control output.
-Here is an example that publishes some input text. With KafkaOutputFormat, the
-key is a NullWritable and is ignored; only values are published. Speculative
-execution is turned off by the OutputFormat.
-
- import kafka.bridge.hadoop.KafkaOutputFormat;
-
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
- import java.io.IOException;
-
- public class TextPublisher
- {
- public static void main(String[] args) throws Exception
- {
- if (args.length != 2) {
- System.err.println("usage: <input path> <kafka output url>");
- return;
- }
-
- Job job = new Job();
-
- job.setJarByClass(TextPublisher.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(BytesWritable.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(KafkaOutputFormat.class);
-
- job.setMapperClass(TheMapper.class);
- job.setNumReduceTasks(0);
-
- FileInputFormat.addInputPath(job, new Path(args[0]));
- KafkaOutputFormat.setOutputPath(job, new Path(args[1]));
-
- if (!job.waitForCompletion(true)) {
- throw new RuntimeException("Job failed!");
- }
- }
-
- public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
- {
- @Override
- protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
- {
- context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
- }
- }
- }
+Included is an example that publishes some input text line-by-line to a topic.
+With KafkaOutputFormat, the key can be a null, where it is ignored by the
+producer (random partitioning), or any object for semantic partitioning of the
+stream (with an appropriate Kafka partitioner set). Speculative execution is
+turned off by the OutputFormat.
What can I tune?
----------------
-Normally, you needn't change any of these parameters:
-
-* kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka
+* kafka.output.queue.size: Bytes to queue in memory before pushing to the Kafka
producer (i.e., the batch size). Default is 10*1024*1024 (10MB).
-* kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka
- producer docs). Default is 30*1000 (30s).
-* kafka.output.reconnect_timeout: Milliseconds to wait until attempting
- reconnection (see Kafka producer docs). Default is 1000 (1s).
-* kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer
- docs). Default is 64*1024 (64KB).
-* kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer
- docs). Default is 1024*1024 (1MB).
-* kafka.output.compression_codec: The compression codec to use (see Kafka producer
- docs). Default is 0 (no compression).
+
+Any of Kafka's producer parameters can be changed by prefixing them with
+"kafka.output" in one's job configuration. For example, to change the
+compression codec, one would add the "kafka.output.compression.codec" parameter
+(e.g., "SET kafka.output.compression.codec 0" in one's Pig script for no
+compression).
For easier debugging, the above values as well as the Kafka broker information
-(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic),
-and the schema (kafka.output.schema) are injected into the job's configuration.
+(kafka.broker.list), the topic (kafka.output.topic), and the schema
+(kafka.output.schema) are injected into the job's configuration. By default,
+the Hadoop producer uses Kafka's sync producer as asynchronous operation
+doesn't make sense in the batch Hadoop case.
http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
index 5acbcee..d447b1d 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
@@ -16,18 +16,18 @@
*/
package kafka.bridge.examples;
-
import java.io.IOException;
import kafka.bridge.hadoop.KafkaOutputFormat;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+/**
+ * Publish a text file line by line to a Kafka topic
+ */
public class TextPublisher
{
public static void main(String[] args) throws Exception
@@ -40,8 +40,6 @@ public class TextPublisher
Job job = new Job();
job.setJarByClass(TextPublisher.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(KafkaOutputFormat.class);
@@ -56,12 +54,12 @@ public class TextPublisher
}
}
- public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
+ public static class TheMapper extends Mapper<Object, Text, Object, Object>
{
@Override
- protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
+ protected void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
- context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
+ context.write(null, value.getBytes());
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 2fd2035..aa1f944 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -16,19 +16,16 @@
*/
package kafka.bridge.hadoop;
-
import java.io.IOException;
import java.net.URI;
-import java.util.Properties;
+import java.util.*;
import kafka.common.KafkaException;
import kafka.javaapi.producer.Producer;
-import kafka.message.Message;
import kafka.producer.ProducerConfig;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -38,26 +35,26 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.log4j.Logger;
-public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
+public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
{
private Logger log = Logger.getLogger(KafkaOutputFormat.class);
public static final String KAFKA_URL = "kafka.output.url";
- /** Bytes to buffer before the OutputFormat does a send */
+ /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */
public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
- /** Default value for Kafka's connect.timeout.ms */
- public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000;
- /** Default value for Kafka's reconnect.interval*/
- public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000;
- /** Default value for Kafka's buffer.size */
- public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024;
- /** Default value for Kafka's max.message.size */
- public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024;
- /** Default value for Kafka's producer.type */
- public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync";
- /** Default value for Kafka's compression.codec */
- public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0;
+ public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
+ private static final Map<String, String> kafkaConfigMap;
+ static {
+ Map<String, String> cMap = new HashMap<String, String>();
+
+ // default Hadoop producer configs
+ cMap.put("producer.type", "sync");
+ cMap.put("send.buffer.bytes", Integer.toString(64*1024));
+ cMap.put("compression.codec", Integer.toString(1));
+
+ kafkaConfigMap = Collections.unmodifiableMap(cMap);
+ }
public KafkaOutputFormat()
{
@@ -91,7 +88,7 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
}
@Override
- public RecordWriter<NullWritable, W> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
{
Path outputPath = getOutputPath(context);
if (outputPath == null)
@@ -102,58 +99,44 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
Properties props = new Properties();
String topic;
- final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE);
- final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT);
- final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL);
- final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE);
- final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE);
- final String producerType = job.get("kafka.output.producer_type", KAFKA_PRODUCER_PRODUCER_TYPE);
- final int compressionCodec = job.getInt("kafka.output.compression_codec", KAFKA_PRODUCER_COMPRESSION_CODEC);
-
- job.setInt("kafka.output.queue_size", queueSize);
- job.setInt("kafka.output.connect_timeout", timeout);
- job.setInt("kafka.output.reconnect_interval", interval);
- job.setInt("kafka.output.bufsize", bufSize);
- job.setInt("kafka.output.max_msgsize", maxSize);
- job.set("kafka.output.producer_type", producerType);
- job.setInt("kafka.output.compression_codec", compressionCodec);
-
- props.setProperty("producer.type", producerType);
- props.setProperty("send.buffer.bytes", Integer.toString(bufSize));
- props.setProperty("connect.timeout.ms", Integer.toString(timeout));
- props.setProperty("reconnect.interval", Integer.toString(interval));
- props.setProperty("compression.codec", Integer.toString(compressionCodec));
+ props.putAll(kafkaConfigMap); // inject default configuration
+ for (Map.Entry<String, String> m : job) { // handle any overrides
+ if (!m.getKey().startsWith(KAFKA_CONFIG_PREFIX))
+ continue;
+ if (m.getKey().equals(KAFKA_URL))
+ continue;
+
+ String kafkaKeyName = m.getKey().substring(KAFKA_CONFIG_PREFIX.length()+1);
+ props.setProperty(kafkaKeyName, m.getValue()); // set Kafka producer property
+ }
+
+ // inject Kafka producer props back into jobconf for easier debugging
+ for (Map.Entry<Object, Object> m : props.entrySet()) {
+ job.set(KAFKA_CONFIG_PREFIX + "." + m.getKey().toString(), m.getValue().toString());
+ }
+
+ // KafkaOutputFormat specific parameters
+ final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.size", KAFKA_QUEUE_SIZE);
+ job.setInt(KAFKA_CONFIG_PREFIX + ".queue.size", queueSize);
if (uri.getScheme().equals("kafka")) {
- // using the legacy direct broker list
+ // using the direct broker list
// URL: kafka://<kafka host>/<topic>
// e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
-
- // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique
- // (KAFKA-258 will remove the broker_id requirement)
- StringBuilder brokerListBuilder = new StringBuilder();
- String delim = "";
- int brokerId = 0;
- for (String serverPort : uri.getAuthority().split(",")) {
- brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort));
- delim = ",";
- brokerId++;
- }
- String brokerList = brokerListBuilder.toString();
-
+ String brokerList = uri.getAuthority();
props.setProperty("broker.list", brokerList);
- job.set("kafka.broker.list", brokerList);
+ job.set(KAFKA_CONFIG_PREFIX + ".broker.list", brokerList);
if (uri.getPath() == null || uri.getPath().length() <= 1)
throw new KafkaException("no topic specified in kafka uri");
- topic = uri.getPath().substring(1); // ignore the initial '/' in the path
- job.set("kafka.output.topic", topic);
+ topic = uri.getPath().substring(1); // ignore the initial '/' in the path
+ job.set(KAFKA_CONFIG_PREFIX + ".topic", topic);
log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic));
} else
throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
- Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
- return new KafkaRecordWriter<W>(producer, topic, queueSize);
+ Producer<Object, byte[]> producer = new Producer<Object, byte[]>(new ProducerConfig(props));
+ return new KafkaRecordWriter<K, V>(producer, topic, queueSize);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index 8c84786..a381ccd 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -16,49 +16,58 @@
*/
package kafka.bridge.hadoop;
-
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
-import kafka.message.Message;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
+public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
{
- protected Producer<Integer, Message> producer;
+ protected Producer<Object, byte[]> producer;
protected String topic;
- protected List<KeyedMessage<Integer, Message>> msgList = new LinkedList<KeyedMessage<Integer, Message>>();
+ protected List<KeyedMessage<Object, byte[]>> msgList = new LinkedList<KeyedMessage<Object, byte[]>>();
protected int totalSize = 0;
protected int queueSize;
- public KafkaRecordWriter(Producer<Integer, Message> producer, String topic, int queueSize)
+ public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueSize)
{
this.producer = producer;
this.topic = topic;
this.queueSize = queueSize;
}
- protected void sendMsgList()
+ protected void sendMsgList() throws IOException
{
if (msgList.size() > 0) {
- producer.send(msgList);
+ try {
+ producer.send(msgList);
+ }
+ catch (Exception e) {
+ throw new IOException(e); // all Kafka exceptions become IOExceptions
+ }
msgList.clear();
totalSize = 0;
}
}
@Override
- public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
+ public void write(K key, V value) throws IOException, InterruptedException
{
- Message msg = new Message(value.getBytes());
- msgList.add(new KeyedMessage<Integer, Message>(this.topic, msg));
- totalSize += msg.size();
+ byte[] valBytes;
+ if (value instanceof byte[])
+ valBytes = (byte[]) value;
+ else if (value instanceof BytesWritable)
+ valBytes = ((BytesWritable) value).getBytes();
+ else
+ throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
+
+ msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
+ totalSize += valBytes.length;
// MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE)
http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
index faa1950..d24a85a 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
@@ -16,7 +16,6 @@
*/
package kafka.bridge.pig;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -25,8 +24,6 @@ import kafka.bridge.hadoop.KafkaRecordWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -38,7 +35,7 @@ import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
public class AvroKafkaStorage extends StoreFunc
{
- protected KafkaRecordWriter writer;
+ protected KafkaRecordWriter<Object, byte[]> writer;
protected org.apache.avro.Schema avroSchema;
protected PigAvroDatumWriter datumWriter;
protected Encoder encoder;
@@ -68,6 +65,7 @@ public class AvroKafkaStorage extends StoreFunc
}
@Override
+ @SuppressWarnings("unchecked")
public void prepareToWrite(RecordWriter writer) throws IOException
{
if (this.avroSchema == null)
@@ -108,7 +106,7 @@ public class AvroKafkaStorage extends StoreFunc
this.encoder.flush();
try {
- this.writer.write(NullWritable.get(), new BytesWritable(this.os.toByteArray()));
+ this.writer.write(null, this.os.toByteArray());
}
catch (InterruptedException e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/295734f6/project/build/KafkaProject.scala
----------------------------------------------------------------------
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
index 48d1930..fac723a 100644
--- a/project/build/KafkaProject.scala
+++ b/project/build/KafkaProject.scala
@@ -239,7 +239,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2">
<exclude module="junit"/>
</dependency>
- <dependency org="org.apache.pig" name="pig" rev="0.8.0">
+ <dependency org="org.apache.pig" name="pig" rev="0.10.0">
<exclude module="junit"/>
</dependency>
</dependencies>