You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/21 13:47:52 UTC

[camel] branch master updated: CAMEL-12019: camel-kafka - Add option max.poll.interval.ms

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 07b8cf4  CAMEL-12019: camel-kafka - Add option max.poll.interval.ms
07b8cf4 is described below

commit 07b8cf49ecea6865ac977f9d83fcac5355bfc048
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Nov 21 14:43:24 2017 +0100

    CAMEL-12019: camel-kafka - Add option max.poll.interval.ms
---
 .../camel-kafka/src/main/docs/kafka-component.adoc      |  3 ++-
 .../camel/component/kafka/KafkaConfiguration.java       | 17 +++++++++++++++++
 .../kafka/springboot/KafkaComponentConfiguration.java   | 17 +++++++++++++++++
 3 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index ff17998..f33567c 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -69,7 +69,7 @@ with the following path and query parameters:
 | *topic* | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic. |  | String
 |===
 
-==== Query Parameters (89 parameters):
+==== Query Parameters (90 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -95,6 +95,7 @@ with the following path and query parameters:
 | *heartbeatIntervalMs* (consumer) | The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. | [...]
 | *keyDeserializer* (consumer) | Deserializer class for key that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
 | *maxPartitionFetchBytes* (consumer) | The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens the consumer can get stuck trying to fetch a large message on a certain partition. | 1048576 | Integer
+| *maxPollIntervalMs* (consumer) | The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. |  | Long
 | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. |  | String>
 | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 86cd537..837ca9e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -91,6 +91,8 @@ public class KafkaConfiguration implements Cloneable {
     private Integer maxPollRecords;
     @UriParam(label = "consumer", defaultValue = "5000")
     private Long pollTimeoutMs = 5000L;
+    @UriParam(label = "consumer")
+    private Long maxPollIntervalMs;
     //auto.offset.reset1
     @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none")
     private String autoOffsetReset = "latest";
@@ -381,6 +383,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+        addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
         addPropertyIfNotNull(props, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
         addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
@@ -1413,6 +1416,20 @@ public class KafkaConfiguration implements Cloneable {
         this.pollTimeoutMs = pollTimeoutMs;
     }
 
+    public Long getMaxPollIntervalMs() {
+        return maxPollIntervalMs;
+    }
+
+    /**
+     * The maximum delay between invocations of poll() when using consumer group management.
+     * This places an upper bound on the amount of time that the consumer can be idle before fetching more records.
+     * If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group
+     * will rebalance in order to reassign the partitions to another member.
+     */
+    public void setMaxPollIntervalMs(Long maxPollIntervalMs) {
+        this.maxPollIntervalMs = maxPollIntervalMs;
+    }
+
     public String getPartitionAssignor() {
         return partitionAssignor;
     }
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index ad6f219..f4e8af5 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -652,6 +652,15 @@ public class KafkaComponentConfiguration
          */
         private Long pollTimeoutMs = 5000L;
         /**
+         * The maximum delay between invocations of poll() when using consumer
+         * group management. This places an upper bound on the amount of time
+         * that the consumer can be idle before fetching more records. If poll()
+         * is not called before expiration of this timeout, then the consumer is
+         * considered failed and the group will rebalance in order to reassign
+         * the partitions to another member.
+         */
+        private Long maxPollIntervalMs;
+        /**
          * The class name of the partition assignment strategy that the client
          * will use to distribute partition ownership amongst consumer instances
          * when group management is used
@@ -1332,6 +1341,14 @@ public class KafkaComponentConfiguration
             this.pollTimeoutMs = pollTimeoutMs;
         }
 
+        public Long getMaxPollIntervalMs() {
+            return maxPollIntervalMs;
+        }
+
+        public void setMaxPollIntervalMs(Long maxPollIntervalMs) {
+            this.maxPollIntervalMs = maxPollIntervalMs;
+        }
+
         public String getPartitionAssignor() {
             return partitionAssignor;
         }

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].