You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/24 11:34:41 UTC

[3/3] camel git commit: Add some logging in kafka producer when it sends/has all sent etc.

Add some logging in kafka producer when it sends/has all sent etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6243402b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6243402b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6243402b

Branch: refs/heads/master
Commit: 6243402b2291af99c7f7b950f5e905489d403074
Parents: 8b5e93e
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Apr 24 13:29:33 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 24 13:29:33 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaProducer.java   | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6243402b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index ede3d3e..e3b556b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -232,7 +232,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         while (c.hasNext()) {
-            futures.add(kafkaProducer.send(c.next()));
+            ProducerRecord rec = c.next();
+            if (log.isDebugEnabled()) {
+                log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
+            }
+            futures.add(kafkaProducer.send(rec));
         }
         for (Future<RecordMetadata> f : futures) {
             //wait for them all to be sent
@@ -248,7 +252,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
             KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
             while (c.hasNext()) {
                 cb.increment();
-                kafkaProducer.send(c.next(), cb);
+                ProducerRecord rec = c.next();
+                if (log.isDebugEnabled()) {
+                    log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
+                }
+                kafkaProducer.send(rec, cb);
             }
             return cb.allSent();
         } catch (Exception ex) {
@@ -306,6 +314,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         boolean allSent() {
             if (count.decrementAndGet() == 0) {
+                log.trace("All messages sent, continue routing.");
                 //was able to get all the work done while queuing the requests
                 callback.done(true);
                 return true;
@@ -327,6 +336,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 workerPool.submit(new Runnable() {
                     @Override
                     public void run() {
+                        log.trace("All messages sent, continue routing.");
                         callback.done(false);
                     }
                 });