You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/12 11:09:30 UTC

[1/3] camel git commit: camel-kafka - Update its default values due kafka changes and add missing option

Repository: camel
Updated Branches:
  refs/heads/master 6a87dad94 -> 99d268430


camel-kafka - Update its default values due kafka changes and add missing option


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bd0dfe88
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bd0dfe88
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bd0dfe88

Branch: refs/heads/master
Commit: bd0dfe885228c159ea2d149a5fc378ad84e02926
Parents: 6a87dad
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 12 11:41:46 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 12 11:41:46 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 11 +++---
 .../component/kafka/KafkaConfiguration.java     | 35 +++++++++++++++-----
 .../springboot/KafkaComponentConfiguration.java |  9 +++++
 3 files changed, 42 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bd0dfe88/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index ffdb33f..d559d77 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -65,7 +65,7 @@ with the following path and query parameters:
 | topic |  | String | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic.
 |=======================================================================
 
-#### Query Parameters (81 parameters):
+#### Query Parameters (82 parameters):
 
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -81,7 +81,8 @@ with the following path and query parameters:
 | consumerRequestTimeoutMs | consumer | 40000 | Integer | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
 | consumersCount | consumer | 1 | int | The number of consumers that connect to kafka server
 | consumerStreams | consumer | 10 | int | Number of concurrent consumers on the consumer
-| fetchMinBytes | consumer | 1024 | Integer | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
+| fetchMaxBytes | consumer | 52428800 | Integer | The maximum amount of data the server should return for a fetch request This is not an absolute maximum if the first message in the first non-empty partition of the fetch is larger than this value the message will still be returned to ensure that the consumer can make progress. The maximum message size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
+| fetchMinBytes | consumer | 1 | Integer | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
 | fetchWaitMaxMs | consumer | 500 | Integer | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
 | groupId | consumer |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers.
 | heartbeatIntervalMs | consumer | 3000 | Integer | The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
@@ -92,7 +93,7 @@ with the following path and query parameters:
 | partitionAssignor | consumer | org.apache.kafka.clients.consumer.RangeAssignor | String | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used
 | pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the KafkaConsumer.
 | seekTo | consumer |  | String | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning
-| sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when using Kafka's group management facilities.
+| sessionTimeoutMs | consumer | 10000 | Integer | The timeout used to detect failures when using Kafka's group management facilities.
 | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for value that implements the Deserializer interface.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored.
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.
@@ -114,11 +115,11 @@ with the following path and query parameters:
 | partitionKey | producer |  | Integer | The partition to which the record will be sent (or null if no partition was specified). If this option has been configured then it take precedence over header link KafkaConstantsPARTITION_KEY
 | producerBatchSize | producer | 16384 | Integer | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches one for each partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
 | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.
-| receiveBufferBytes | producer | 32768 | Integer | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
+| receiveBufferBytes | producer | 65536 | Integer | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
 | reconnectBackoffMs | producer | 50 | Integer | The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
 | recordMetadata | producer | true | boolean | Whether the producer should store the RecordMetadata results from sending to Kafka. The results are stored in a List containing the RecordMetadata metadata's. The list is stored on a header with the key link KafkaConstantsKAFKA_RECORDMETA
 | requestRequiredAcks | producer | 1 | String | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This
  means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
-| requestTimeoutMs | producer | 30000 | Integer | The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
+| requestTimeoutMs | producer | 305000 | Integer | The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
 | retries | producer | 0 | Integer | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition and the first fails and is retried but the second succeeds then the second record may appear first.
 | retryBackoffMs | producer | 100 | Integer | Before each retry the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time this property specifies the amount of time that the producer waits before refreshing the metadata.
 | sendBufferBytes | producer | 131072 | Integer | Socket write buffer size

http://git-wip-us.apache.org/repos/asf/camel/blob/bd0dfe88/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index dc3ae72..0bf2135 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -71,8 +71,11 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
     private String valueDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
     //fetch.min.bytes
-    @UriParam(label = "consumer", defaultValue = "1024")
-    private Integer fetchMinBytes = 1024;
+    @UriParam(label = "consumer", defaultValue = "1")
+    private Integer fetchMinBytes = 1;
+    //fetch.min.bytes
+    @UriParam(label = "consumer", defaultValue = "52428800")
+    private Integer fetchMaxBytes = 50 * 1024 * 1024;
     //heartbeat.interval.ms
     @UriParam(label = "consumer", defaultValue = "3000")
     private Integer heartbeatIntervalMs = 3000;
@@ -80,8 +83,8 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam(label = "consumer", defaultValue = "1048576")
     private Integer maxPartitionFetchBytes = 1048576;
     //session.timeout.ms
-    @UriParam(label = "consumer", defaultValue = "30000")
-    private Integer sessionTimeoutMs = 30000;
+    @UriParam(label = "consumer", defaultValue = "10000")
+    private Integer sessionTimeoutMs = 10000;
     @UriParam(label = "consumer", defaultValue = "500")
     private Integer maxPollRecords;
     @UriParam(label = "consumer", defaultValue = "5000")
@@ -167,11 +170,11 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam(label = "producer", defaultValue = "1048576")
     private Integer maxRequestSize = 1048576;
     //receive.buffer.bytes
-    @UriParam(label = "producer", defaultValue = "32768")
-    private Integer receiveBufferBytes = 32768;
+    @UriParam(label = "producer", defaultValue = "65536")
+    private Integer receiveBufferBytes = 65536;
     //request.timeout.ms
-    @UriParam(label = "producer", defaultValue = "30000")
-    private Integer requestTimeoutMs = 30000;
+    @UriParam(label = "producer", defaultValue = "305000")
+    private Integer requestTimeoutMs = 305000;
     //send.buffer.bytes
     @UriParam(label = "producer", defaultValue = "131072")
     private Integer sendBufferBytes = 131072;
@@ -350,6 +353,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer());
         addPropertyIfNotNull(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
         addPropertyIfNotNull(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes());
+        addPropertyIfNotNull(props, ConsumerConfig.FETCH_MAX_BYTES_CONFIG, getFetchMaxBytes());
         addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
@@ -593,6 +597,21 @@ public class KafkaConfiguration implements Cloneable {
         this.fetchMinBytes = fetchMinBytes;
     }
 
+    /**
+     * The maximum amount of data the server should return for a fetch request
+     * This is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than
+     * this value, the message will still be returned to ensure that the consumer can make progress.
+     * The maximum message size accepted by the broker is defined via message.max.bytes (broker config) or
+     * max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
+     */
+    public Integer getFetchMaxBytes() {
+        return fetchMaxBytes;
+    }
+
+    public void setFetchMaxBytes(Integer fetchMaxBytes) {
+        this.fetchMaxBytes = fetchMaxBytes;
+    }
+
     public Integer getFetchWaitMaxMs() {
         return fetchWaitMaxMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/bd0dfe88/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index da5d483..93b3cda 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -150,6 +150,7 @@ public class KafkaComponentConfiguration {
          * that much data to accumulate before answering the request.
          */
         private Integer fetchMinBytes;
+        private Integer fetchMaxBytes;
         /**
          * The maximum amount of time the server will block before answering the
          * fetch request if there isn't sufficient data to immediately satisfy
@@ -687,6 +688,14 @@ public class KafkaComponentConfiguration {
             this.fetchMinBytes = fetchMinBytes;
         }
 
+        public Integer getFetchMaxBytes() {
+            return fetchMaxBytes;
+        }
+
+        public void setFetchMaxBytes(Integer fetchMaxBytes) {
+            this.fetchMaxBytes = fetchMaxBytes;
+        }
+
         public Integer getFetchWaitMaxMs() {
             return fetchWaitMaxMs;
         }


[3/3] camel git commit: CAMEL-10996: Upgrade kafka to 0.10.2.0

Posted by da...@apache.org.
CAMEL-10996: Upgrade kafka to 0.10.2.0


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/99d26843
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/99d26843
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/99d26843

Branch: refs/heads/master
Commit: 99d268430f80f35c02328e5b2b171d666a8d3dc3
Parents: bf2e963
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 12 12:09:17 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 12 12:09:17 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/component/kafka/embedded/SystemTime.java | 2 +-
 parent/pom.xml                                                     | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/99d26843/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
index 43edda9..a1fe39e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.kafka.embedded;
 
-import kafka.utils.Time;
+import org.apache.kafka.common.utils.Time;
 
 class SystemTime implements Time {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/99d26843/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index ee6d05a..acefd4b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -386,7 +386,7 @@
     <jython-version>2.5.3</jython-version>
     <jzlib-version>1.1.3</jzlib-version>
     <jzlib-bundle-version>1.1.3_2</jzlib-bundle-version>
-    <kafka-version>0.10.1.1</kafka-version>
+    <kafka-version>0.10.2.0</kafka-version>
     <kafka-bundle-version>0.10.1.1_1</kafka-bundle-version>
     <karaf-version>2.4.4</karaf-version>
     <karaf3-version>3.0.8</karaf3-version>


[2/3] camel git commit: CAMEL-10994: camel-kafka - Allow to configure more options on component level

Posted by da...@apache.org.
CAMEL-10994: camel-kafka - Allow to configure more options on component level


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf2e963a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf2e963a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf2e963a

Branch: refs/heads/master
Commit: bf2e963a5907b09fc7b0584690ef22f8aa96972c
Parents: bd0dfe8
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 12 12:08:57 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 12 12:08:57 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/component/kafka/KafkaComponentTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bf2e963a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 92982cc..5a8af58 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -52,7 +52,7 @@ public class KafkaComponentTest {
         String uri = "kafka:mytopic?partitioner=com.class.Party";
 
         KafkaEndpoint endpoint = (KafkaEndpoint) kafka.createEndpoint(uri);
-        assertEquals(null, endpoint.getConfiguration().getBrokers());
+        assertEquals("broker1:12345,broker2:12566", endpoint.getConfiguration().getBrokers());
         assertEquals("broker1:12345,broker2:12566", endpoint.getComponent().getBrokers());
         assertEquals("mytopic", endpoint.getConfiguration().getTopic());
         assertEquals("com.class.Party", endpoint.getConfiguration().getPartitioner());