You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:56 UTC

[11/36] git commit: KAFKA-991; Reduce the queue size in hadoop producer; patched by Swapnil Ghike, reviewed by Jay Kreps and Joel Koshy.

KAFKA-991; Reduce the queue size in hadoop producer; patched by Swapnil Ghike, reviewed by Jay Kreps and Joel Koshy.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8edd3e63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8edd3e63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8edd3e63

Branch: refs/heads/trunk
Commit: 8edd3e63024832f2c05b5819ab3dfc2e0b300729
Parents: 76d3905
Author: Joel Koshy <jj...@gmail.com>
Authored: Thu Aug 1 14:58:25 2013 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Aug 1 14:58:25 2013 -0700

----------------------------------------------------------------------
 .../src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java  | 7 +++++--
 core/src/main/scala/kafka/server/KafkaConfig.scala            | 4 ++--
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8edd3e63/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 0b435b9..709a609 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -40,8 +40,11 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
   private Logger log = Logger.getLogger(KafkaOutputFormat.class);
 
   public static final String KAFKA_URL = "kafka.output.url";
-  /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */
-  public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+  /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window):
+   *  We set the default to a million bytes so that the server will not reject the batch of messages
+   *  with a MessageSizeTooLargeException. The actual size will be smaller after compression.
+   */
+  public static final int KAFKA_QUEUE_SIZE = 1000000;
 
   public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
   private static final Map<String, String> kafkaConfigMap;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8edd3e63/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b774431..41c9626 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.util.Properties
-import kafka.message.Message
+import kafka.message.{MessageSet, Message}
 import kafka.consumer.ConsumerConfig
 import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
 
@@ -38,7 +38,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
 
   /* the maximum size of message that the server can receive */
-  val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
+  val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
   
   /* the number of network threads that the server uses for handling network requests */
   val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))