You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/05/20 12:21:14 UTC

camel git commit: CAMEL-9978: Camel-Kafka: configuration type mismatch for parameter acks

Repository: camel
Updated Branches:
  refs/heads/master 411f51f27 -> 8b9791f8b


CAMEL-9978: Camel-Kafka: configuration type mismatch for parameter acks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8b9791f8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8b9791f8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8b9791f8

Branch: refs/heads/master
Commit: 8b9791f8b4b1154d6a053de901a5efd2105409e5
Parents: 411f51f
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri May 20 14:17:53 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri May 20 14:17:53 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc                | 4 +++-
 .../org/apache/camel/component/kafka/KafkaConfiguration.java   | 6 +++---
 .../java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 4 ++--
 .../org/apache/camel/component/kafka/KafkaComponentTest.java   | 4 ++--
 4 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index 2811a62..2e90613 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -84,6 +84,7 @@ The Kafka component supports 1 options which are listed below.
 
 
 
+
 // endpoint options: START
 The Kafka component supports 74 endpoint options which are listed below:
 
@@ -138,7 +139,7 @@ The Kafka component supports 74 endpoint options which are listed below:
 | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.
 | receiveBufferBytes | producer | 32768 | Integer | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
 | reconnectBackoffMs | producer | 50 | Integer | The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
-| requestRequiredAcks | producer | 1 | Integer | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all Thi
 s means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
+| requestRequiredAcks | producer | 1 | String | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This
  means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
 | requestTimeoutMs | producer | 30000 | Integer | The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
 | retries | producer | 0 | Integer | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition and the first fails and is retried but the second succeeds then the second record may appear first.
 | retryBackoffMs | producer | 100 | Integer | Before each retry the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time this property specifies the amount of time that the producer waits before refreshing the metadata.
@@ -175,6 +176,7 @@ The Kafka component supports 74 endpoint options which are listed below:
 
 
 
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 1a068c3..9e3b39d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -120,7 +120,7 @@ public class KafkaConfiguration {
     private String keySerializerClass;
 
     @UriParam(label = "producer", defaultValue = "1")
-    private Integer requestRequiredAcks = 1;
+    private String requestRequiredAcks = "1";
     //buffer.memory
     @UriParam(label = "producer", defaultValue = "33554432")
     private Integer bufferMemorySize = 33554432;
@@ -867,7 +867,7 @@ public class KafkaConfiguration {
         this.bufferMemorySize = bufferMemorySize;
     }
 
-    public Integer getRequestRequiredAcks() {
+    public String getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
 
@@ -884,7 +884,7 @@ public class KafkaConfiguration {
      * acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the
      * record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
      */
-    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
+    public void setRequestRequiredAcks(String requestRequiredAcks) {
         this.requestRequiredAcks = requestRequiredAcks;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index eb03493..ec75c4b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -419,7 +419,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public Integer getRequestRequiredAcks() {
+    public String getRequestRequiredAcks() {
         return configuration.getRequestRequiredAcks();
     }
 
@@ -479,7 +479,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getSslCipherSuites();
     }
 
-    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
+    public void setRequestRequiredAcks(String requestRequiredAcks) {
         configuration.setRequestRequiredAcks(requestRequiredAcks);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8b9791f8/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 1c2c564..6a3773a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -59,7 +59,7 @@ public class KafkaComponentTest {
 
         KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params);
 
-        assertEquals(new Integer(0), endpoint.getRequestRequiredAcks());
+        assertEquals("1", endpoint.getRequestRequiredAcks());
         assertEquals(new Integer(1), endpoint.getBufferMemorySize());
         assertEquals(new Integer(10), endpoint.getProducerBatchSize());
         assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
@@ -155,7 +155,7 @@ public class KafkaComponentTest {
     }
 
     private void setProducerProperty(Map<String, Object> params) {
-        params.put("requestRequiredAcks", 0);
+        params.put("requestRequiredAcks", "1");
         params.put("bufferMemorySize", 1);
         params.put("compressionCodec", "none");
         params.put("retries", 0);