You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/01 14:27:33 UTC

camel git commit: CAMEL-10213: Add maxPollRecords option. Thanks to Oliver Holzmann for reporting.

Repository: camel
Updated Branches:
  refs/heads/master a3551e3f1 -> dc7770149


CAMEL-10213: Add maxPollRecords option. Thanks to Oliver Holzmann for reporting.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc777014
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc777014
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc777014

Branch: refs/heads/master
Commit: dc77701493d993621ec20c835ddeba62cfaff9fe
Parents: a3551e3
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Aug 1 16:27:01 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Aug 1 16:27:01 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc     |  9 +++++++--
 .../camel/component/kafka/KafkaConfiguration.java   | 16 ++++++++++++++++
 .../apache/camel/component/kafka/KafkaEndpoint.java | 11 ++++++++++-
 3 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc777014/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index fde33e3..d8753d2 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -91,15 +91,16 @@ The Kafka component supports 1 options which are listed below.
 
 
 
+
+
 // endpoint options: START
-The Kafka component supports 75 endpoint options which are listed below:
+The Kafka component supports 76 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | brokers | common |  | String | *Required* This is for bootstrapping and the producer will only use it for getting metadata (topics partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1host2:port2 and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as metadata.broker.list in the Kafka documentation.
-| bridgeEndpoint | common | false | boolean | If the option is true then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
 | clientId | common |  | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
 | kerberosBeforeReloginMinTime | common | 60000 | Integer | Login thread sleep time between refresh attempts.
@@ -123,12 +124,14 @@ The Kafka component supports 75 endpoint options which are listed below:
 | heartbeatIntervalMs | consumer | 3000 | Integer | The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
 | keyDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for key that implements the Deserializer interface.
 | maxPartitionFetchBytes | consumer | 1048576 | Integer | The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens the consumer can get stuck trying to fetch a large message on a certain partition.
+| maxPollRecords | consumer | 2147483647 | Integer | A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
 | partitionAssignor | consumer | org.apache.kafka.clients.consumer.RangeAssignor | String | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used
 | pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the KafkaConsumer.
 | seekToBeginning | consumer | false | boolean | If the option is true then KafkaConsumer will read from beginning on startup.
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when using Kafka's group management facilities.
 | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for value that implements the Deserializer interface.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
+| bridgeEndpoint | producer | false | boolean | If the option is true then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
 | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
 | compressionCodec | producer | none | String | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections after the number of milliseconds specified by this config.
@@ -190,6 +193,8 @@ The Kafka component supports 75 endpoint options which are listed below:
 
 
 
+
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/dc777014/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index a530795..deb12a1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -71,6 +71,8 @@ public class KafkaConfiguration {
     //session.timeout.ms
     @UriParam(label = "consumer", defaultValue = "30000")
     private Integer sessionTimeoutMs = 30000;
+    @UriParam(label = "consumer", defaultValue = "2147483647")
+    private Integer maxPollRecords;
     @UriParam(label = "consumer", defaultValue = "5000")
     private Long pollTimeoutMs = 5000L;
     //auto.offset.reset1
@@ -310,6 +312,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+        addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
         // SSL
         addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
@@ -1091,6 +1094,19 @@ public class KafkaConfiguration {
         this.sessionTimeoutMs = sessionTimeoutMs;
     }
 
+    public Integer getMaxPollRecords() {
+        return maxPollRecords;
+    }
+
+    /**
+     * A unique string that identifies the consumer group this consumer belongs to.
+     * This property is required if the consumer uses either the group management functionality by using
+     * <code>subscribe(topic)</code> or the Kafka-based offset management strategy.
+     */
+    public void setMaxPollRecords(Integer maxPollRecords) {
+        this.maxPollRecords = maxPollRecords;
+    }
+
     public Long getPollTimeoutMs() {
         return pollTimeoutMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/dc777014/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index e918bfd..ff0028e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -51,7 +51,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
-    @UriParam
+    @UriParam(label = "producer")
     private boolean bridgeEndpoint;
 
     public KafkaEndpoint() {
@@ -744,4 +744,13 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
         configuration.setWorkerPoolCoreSize(workerPoolCoreSize);
     }
+
+    public Integer getMaxPollRecords() {
+        return configuration.getMaxPollRecords();
+    }
+
+    public void setMaxPollRecords(Integer maxPollRecords) {
+        configuration.setMaxPollRecords(maxPollRecords);
+    }
+
 }