You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:56 UTC
[11/36] git commit: KAFKA-991;
Reduce the queue size in hadoop producer;
patched by Swapnil Ghike, reviewed by Jay Kreps and Joel Koshy.
KAFKA-991; Reduce the queue size in hadoop producer; patched by Swapnil Ghike, reviewed by Jay Kreps and Joel Koshy.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8edd3e63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8edd3e63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8edd3e63
Branch: refs/heads/trunk
Commit: 8edd3e63024832f2c05b5819ab3dfc2e0b300729
Parents: 76d3905
Author: Joel Koshy <jj...@gmail.com>
Authored: Thu Aug 1 14:58:25 2013 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Aug 1 14:58:25 2013 -0700
----------------------------------------------------------------------
.../src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java | 7 +++++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++--
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8edd3e63/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 0b435b9..709a609 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -40,8 +40,11 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
private Logger log = Logger.getLogger(KafkaOutputFormat.class);
public static final String KAFKA_URL = "kafka.output.url";
- /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */
- public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
+ /** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window):
+ * We set the default to a million bytes so that the server will not reject the batch of messages
+ * with a MessageSizeTooLargeException. The actual size will be smaller after compression.
+ */
+ public static final int KAFKA_QUEUE_SIZE = 1000000;
public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
private static final Map<String, String> kafkaConfigMap;
http://git-wip-us.apache.org/repos/asf/kafka/blob/8edd3e63/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b774431..41c9626 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,7 +18,7 @@
package kafka.server
import java.util.Properties
-import kafka.message.Message
+import kafka.message.{MessageSet, Message}
import kafka.consumer.ConsumerConfig
import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
@@ -38,7 +38,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
/* the maximum size of message that the server can receive */
- val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
+ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))