You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/02/19 21:49:12 UTC

[samza] branch master updated: SAMZA-2109: Reduce per-partition default buffer sizes

This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c9083e8  SAMZA-2109: Reduce per-partition default buffer sizes
c9083e8 is described below

commit c9083e843bdd4c219c1736aa855f106cd8d0c154
Author: Jagadish <jv...@linkedin.com>
AuthorDate: Tue Feb 19 13:48:56 2019 -0800

    SAMZA-2109: Reduce per-partition default buffer sizes
    
    Author: Jagadish <jv...@linkedin.com>
    
    Reviewers: Shanthoosh <sv...@linkedin.com>
    
    Closes #925 from vjagadish1989/samza-2109
---
 docs/learn/documentation/versioned/jobs/samza-configurations.md         | 2 +-
 .../main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 54ec12c..757d552 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -158,7 +158,7 @@ Samples found [here](../../../../startup/hello-samza/versioned)
 |--- |--- |--- |
 |systems.**_system-name_**.<br>consumer.*| |Any [Kafka consumer configuration](http://kafka.apache.org/documentation.html#newconsumerconfigs) can be included here. For example, to change the socket timeout, you can set `systems.system-name.consumer.socket.timeout.ms`. (There is no need to configure `group.id` or `client.id`, as they are automatically configured by Samza. Also, there is no need to set `auto.commit.enable` because Samza has its own checkpointing mechanism.)|
 |systems.**_system-name_**.<br>producer.*| |Any [Kafka producer configuration](http://kafka.apache.org/documentation.html#producerconfigs) can be included here. For example, to change the request timeout, you can set `systems.system-name.producer.timeout.ms`. (There is no need to configure `client.id` as it is automatically configured by Samza.)|
-|systems.**_system-name_**.<br>samza.fetch.threshold|5000|When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the number of messages we aim to buffer across all stream partitions consumed by a container. For example, if a container consumes 50 partitions, it will try to buffer  [...]
+|systems.**_system-name_**.<br>samza.fetch.threshold|10000|When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the number of messages we aim to buffer across all stream partitions consumed by a container. For example, if a container consumes 50 partitions, it will try to buffer [...]
 |systems.**_system-name_**.<br>samza.fetch.threshold.bytes|-1|When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the total size of messages we aim to buffer across all stream partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered pref [...]
 
 #### <a name="hdfs"></a>[3.3 HDFS](#hdfs)
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index acef057..93ded8b 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -49,7 +49,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class);
 
-  private static final long FETCH_THRESHOLD = 50000;
+  private static final long FETCH_THRESHOLD = 10000;
   private static final long FETCH_THRESHOLD_BYTES = -1L;
 
   protected final Consumer<K, V> kafkaConsumer;