You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/24 11:34:41 UTC
[3/3] camel git commit: Add some logging in kafka producer when it
sends/has all sent etc.
Add some logging in kafka producer when it sends/has all sent etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6243402b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6243402b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6243402b
Branch: refs/heads/master
Commit: 6243402b2291af99c7f7b950f5e905489d403074
Parents: 8b5e93e
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Apr 24 13:29:33 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Apr 24 13:29:33 2017 +0200
----------------------------------------------------------------------
.../apache/camel/component/kafka/KafkaProducer.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6243402b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index ede3d3e..e3b556b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -232,7 +232,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
while (c.hasNext()) {
- futures.add(kafkaProducer.send(c.next()));
+ ProducerRecord rec = c.next();
+ if (log.isDebugEnabled()) {
+ log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
+ }
+ futures.add(kafkaProducer.send(rec));
}
for (Future<RecordMetadata> f : futures) {
//wait for them all to be sent
@@ -248,7 +252,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
while (c.hasNext()) {
cb.increment();
- kafkaProducer.send(c.next(), cb);
+ ProducerRecord rec = c.next();
+ if (log.isDebugEnabled()) {
+ log.debug("Sending message to topic: {}, partition: {}, key: {}", rec.topic(), rec.partition(), rec.key());
+ }
+ kafkaProducer.send(rec, cb);
}
return cb.allSent();
} catch (Exception ex) {
@@ -306,6 +314,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
boolean allSent() {
if (count.decrementAndGet() == 0) {
+ log.trace("All messages sent, continue routing.");
//was able to get all the work done while queuing the requests
callback.done(true);
return true;
@@ -327,6 +336,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
workerPool.submit(new Runnable() {
@Override
public void run() {
+ log.trace("All messages sent, continue routing.");
callback.done(false);
}
});