You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:57 UTC

[12/36] git commit: KAFKA-991; Rename config queue size to queue bytes in hadoop producer; patched by Swapnil Ghike, reviewed by Joel Koshy.

KAFKA-991; Rename config queue size to queue bytes in hadoop producer; patched by Swapnil Ghike, reviewed by Joel Koshy.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6479e8a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6479e8a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6479e8a

Branch: refs/heads/trunk
Commit: b6479e8adca6be8e363d289be29dc755e0655550
Parents: 8edd3e6
Author: Joel Koshy <jj...@gmail.com>
Authored: Thu Aug 1 17:11:58 2013 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Aug 1 17:11:58 2013 -0700

----------------------------------------------------------------------
 contrib/hadoop-producer/README.md               |  4 ++--
 .../kafka/bridge/hadoop/KafkaOutputFormat.java  |  7 +++----
 .../kafka/bridge/hadoop/KafkaRecordWriter.java  | 20 +++++++++++---------
 3 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6479e8a/contrib/hadoop-producer/README.md
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md
index 547c1ef..a5bef73 100644
--- a/contrib/hadoop-producer/README.md
+++ b/contrib/hadoop-producer/README.md
@@ -77,8 +77,8 @@ turned off by the OutputFormat.
 What can I tune?
 ----------------
 
-* kafka.output.queue.size: Bytes to queue in memory before pushing to the Kafka
-  producer (i.e., the batch size). Default is 10*1024*1024 (10MB).
+* kafka.output.queue.bytes: Bytes to queue in memory before pushing to the Kafka
+  producer (i.e., the batch size). Default is 1,000,000 (1 million) bytes.
 
 Any of Kafka's producer parameters can be changed by prefixing them with
 "kafka.output" in one's job configuration. For example, to change the

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6479e8a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 709a609..417b4b3 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -44,7 +44,7 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
    *  We set the default to a million bytes so that the server will not reject the batch of messages
    *  with a MessageSizeTooLargeException. The actual size will be smaller after compression.
    */
-  public static final int KAFKA_QUEUE_SIZE = 1000000;
+  public static final int KAFKA_QUEUE_BYTES = 1000000;
 
   public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
   private static final Map<String, String> kafkaConfigMap;
@@ -119,8 +119,7 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
     }
 
     // KafkaOutputFormat specific parameters
-    final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.size", KAFKA_QUEUE_SIZE);
-    job.setInt(KAFKA_CONFIG_PREFIX + ".queue.size", queueSize);
+    final int queueBytes = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.bytes", KAFKA_QUEUE_BYTES);
 
     if (uri.getScheme().equals("kafka")) {
       // using the direct broker list
@@ -140,6 +139,6 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
       throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
 
     Producer<Object, byte[]> producer = new Producer<Object, byte[]>(new ProducerConfig(props));
-    return new KafkaRecordWriter<K, V>(producer, topic, queueSize);
+    return new KafkaRecordWriter<K, V>(producer, topic, queueBytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6479e8a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index 6eea635..72c088d 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -32,14 +32,14 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
   protected String topic;
 
   protected List<KeyedMessage<Object, byte[]>> msgList = new LinkedList<KeyedMessage<Object, byte[]>>();
-  protected int totalSize = 0;
-  protected int queueSize;
+  protected int totalBytes = 0;
+  protected int queueBytes;
 
-  public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueSize)
+  public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueBytes)
   {
     this.producer = producer;
     this.topic = topic;
-    this.queueSize = queueSize;
+    this.queueBytes = queueBytes;
   }
 
   protected void sendMsgList() throws IOException
@@ -52,7 +52,7 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
         throw new IOException(e);           // all Kafka exceptions become IOExceptions
       }
       msgList.clear();
-      totalSize = 0;
+      totalBytes = 0;
     }
   }
 
@@ -69,12 +69,14 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
     else
       throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
 
-    msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
-    totalSize += valBytes.length;
-
     // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
-    if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE)
+    // If the new message is going to make the message list tip over 1 million bytes, send the
+    // message list now.
+    if ((totalBytes + valBytes.length) > queueBytes || msgList.size() >= Short.MAX_VALUE)
       sendMsgList();
+
+    msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
+    totalBytes += valBytes.length;
   }
 
   @Override