You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/07/01 06:56:35 UTC

[5/6] camel git commit: CAMEL-10020: Added option on endpoint

CAMEL-10020: Added option on endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/466e03c4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/466e03c4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/466e03c4

Branch: refs/heads/master
Commit: 466e03c4ec105693d862428e9a471e16857a0d40
Parents: a81e4d7
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Jul 1 08:42:43 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 1 08:52:28 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc  |  5 ++++-
 .../component/kafka/KafkaConfiguration.java      | 18 ++++++++++++++++++
 .../camel/component/kafka/KafkaProducer.java     | 19 +++++++++++++++++--
 .../camel/component/kafka/KafkaProducerTest.java | 13 ++-----------
 4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/466e03c4/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index 2e90613..8a07a13 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -85,8 +85,9 @@ The Kafka component supports 1 options which are listed below.
 
 
 
+
 // endpoint options: START
-The Kafka component supports 74 endpoint options which are listed below:
+The Kafka component supports 75 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -139,6 +140,7 @@ The Kafka component supports 74 endpoint options which are listed below:
 | queueBufferingMaxMessages | producer | 10000 | Integer | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.
 | receiveBufferBytes | producer | 32768 | Integer | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
 | reconnectBackoffMs | producer | 50 | Integer | The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+| recordMetadata | producer | true | boolean | Whether the producer should store the RecordMetadata results from sending to Kafka. The results are stored in a List containing the RecordMetadata metadata's. The list is stored on a header with the key link KafkaConstantsKAFKA_RECORDMETA
 | requestRequiredAcks | producer | 1 | String | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This
  means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
 | requestTimeoutMs | producer | 30000 | Integer | The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
 | retries | producer | 0 | Integer | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition and the first fails and is retried but the second succeeds then the second record may appear first.
@@ -177,6 +179,7 @@ The Kafka component supports 74 endpoint options which are listed below:
 
 
 
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/466e03c4/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index c066520..7a7f9d4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
@@ -27,6 +28,7 @@ import org.apache.camel.spi.UriPath;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
 
@@ -237,6 +239,8 @@ public class KafkaConfiguration {
     //ssl.trustmanager.algorithm
     @UriParam(label = "producer", defaultValue = "PKIX")
     private String sslTrustmanagerAlgorithm = "PKIX";
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean recordMetadata = true;
 
     public KafkaConfiguration() {
     }
@@ -1202,4 +1206,18 @@ public class KafkaConfiguration {
     public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
         this.workerPoolMaxSize = workerPoolMaxSize;
     }
+
+    public boolean isRecordMetadata() {
+        return recordMetadata;
+    }
+
+    /**
+     * Whether the producer should store the {@link RecordMetadata} results from sending to Kafka.
+     *
+     * The results are stored in a {@link List} containing the {@link RecordMetadata} metadata's.
+     * The list is stored on a header with the key {@link KafkaConstants#KAFKA_RECORDMETA}
+     */
+    public void setRecordMetadata(boolean recordMetadata) {
+        this.recordMetadata = recordMetadata;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/466e03c4/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 3b838e9..322c22f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -177,7 +177,14 @@ public class KafkaProducer extends DefaultAsyncProducer {
         Iterator<ProducerRecord> c = createRecorder(exchange);
         List<Future<RecordMetadata>> futures = new LinkedList<Future<RecordMetadata>>();
         List<RecordMetadata> recordMetadatas = new ArrayList<RecordMetadata>();
-        exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+
+        if (endpoint.getConfiguration().isRecordMetadata()) {
+            if (exchange.hasOut()) {
+                exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+            } else {
+                exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+            }
+        }
 
         while (c.hasNext()) {
             futures.add(kafkaProducer.send(c.next()));
@@ -216,12 +223,19 @@ public class KafkaProducer extends DefaultAsyncProducer {
         KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;
             this.callback = callback;
-            exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+            if (endpoint.getConfiguration().isRecordMetadata()) {
+                if (exchange.hasOut()) {
+                    exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+                } else {
+                    exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas);
+                }
+            }
         }
 
         void increment() {
             count.incrementAndGet();
         }
+
         boolean allSent() {
             if (count.decrementAndGet() == 0) {
                 //was able to get all the work done while queuing the requests
@@ -236,6 +250,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
             if (e != null) {
                 exchange.setException(e);
             }
+
             recordMetadatas.add(recordMetadata);
 
             if (count.decrementAndGet() == 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/466e03c4/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 2dcd8f8..c10266e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -39,7 +39,6 @@ import org.mockito.Mockito;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-
 public class KafkaProducerTest {
 
     private KafkaProducer producer;
@@ -85,8 +84,6 @@ public class KafkaProducerTest {
         producer.process(exchange);
         Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
         assertRecordMetadataExists();
-
-
     }
 
     @Test(expected = Exception.class)
@@ -104,7 +101,6 @@ public class KafkaProducerTest {
         assertRecordMetadataExists();
     }
 
-
     @Test
     public void processAsyncSendsMessage() throws Exception {
         endpoint.setTopic("sometopic");
@@ -125,7 +121,6 @@ public class KafkaProducerTest {
 
     @Test
     public void processAsyncSendsMessageWithException() throws Exception {
-
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
@@ -210,7 +205,6 @@ public class KafkaProducerTest {
 
         verifySendMessage("4", "someTopic", "someKey");
         assertRecordMetadataExists();
-
     }
 
     @Test
@@ -224,7 +218,6 @@ public class KafkaProducerTest {
 
         verifySendMessage("someTopic", "someKey");
         assertRecordMetadataExists();
-
     }
 
     @Test
@@ -241,7 +234,6 @@ public class KafkaProducerTest {
 
         verifySendMessage("4", "someTopic", "someKey");
         assertRecordMetadataExists();
-
     }
 
     @Test // Message and Topic Name alone
@@ -254,7 +246,6 @@ public class KafkaProducerTest {
 
         verifySendMessage("someTopic");
         assertRecordMetadataExists();
-
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -282,9 +273,9 @@ public class KafkaProducerTest {
     }
 
     private void assertRecordMetadataExists() {
-        List<RecordMetadata> recordMetaData1 =  (List<RecordMetadata>)(out.getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        List<RecordMetadata> recordMetaData1 =  (List<RecordMetadata>)(in.getHeader(KafkaConstants.KAFKA_RECORDMETA));
         assertTrue(recordMetaData1 != null);
         assertEquals("Expected one recordMetaData",recordMetaData1.size(),1);
-        assertTrue(recordMetaData1.get(0) !=null);
+        assertTrue(recordMetaData1.get(0) != null);
     }
 }