You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/09 12:50:38 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-733 - Redirect
librdkafka log to proper Logger
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 7936e40 MINIFICPP-733 - Redirect librdkafka log to proper Logger
7936e40 is described below
commit 7936e40e872c43b1780984da35a893405fce78a2
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Mon Jul 8 13:22:26 2019 +0200
MINIFICPP-733 - Redirect librdkafka log to proper Logger
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #607
---
extensions/librdkafka/PublishKafka.cpp | 5 ++-
extensions/librdkafka/PublishKafka.h | 71 ++++++++++++++++++++++++++++++++--
2 files changed, 71 insertions(+), 5 deletions(-)
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index db4b84b..a1887a2 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -272,7 +272,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
}
}
-// Add all of the dynamic properties as librdkafka configurations
+ // Add all of the dynamic properties as librdkafka configurations
const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());
@@ -286,6 +286,9 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
}
}
+ // Set the logger callback
+ rd_kafka_conf_set_log_cb(conf_, KafkaConnection::logCallback);
+
auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr));
if (!producer) {
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 726632b..8a23dee 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -27,6 +27,7 @@
#include "core/Resource.h"
#include "core/Property.h"
#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/Logger.h"
#include "rdkafka.h"
#include <regex>
@@ -68,7 +69,9 @@ class KafkaTopic {
~KafkaTopic() {
if (topic_reference_) {
rd_kafka_topic_destroy(topic_reference_);
- topic_reference_ = 0;
+ }
+ if (topic_conf_) {
+ rd_kafka_topic_conf_destroy(topic_conf_);
}
}
@@ -114,7 +117,8 @@ class KafkaConnection {
public:
explicit KafkaConnection(const KafkaConnectionKey &key)
- : conf_(nullptr),
+ : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
+ conf_(nullptr),
kafka_connection_(nullptr) {
lease_ = false;
initialized_ = false;
@@ -127,11 +131,23 @@ class KafkaConnection {
void remove() {
topics_.clear();
+ removeConnection();
+ }
+
+ void removeConnection() {
if (kafka_connection_) {
rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
rd_kafka_destroy(kafka_connection_);
- kafka_connection_ = 0;
+ modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
+ loggers.erase(kafka_connection_);
+ });
+ kafka_connection_ = nullptr;
}
+ if (conf_) {
+ rd_kafka_conf_destroy(conf_);
+ conf_ = nullptr;
+ }
+ initialized_ = false;
}
bool initialized() {
@@ -140,9 +156,13 @@ class KafkaConnection {
void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) {
std::lock_guard<std::mutex> lock(mutex_);
+ removeConnection();
kafka_connection_ = producer;
conf_ = conf;
initialized_ = true;
+ modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
+ loggers[producer] = logger_;
+ });
}
rd_kafka_conf_t *getConf() {
@@ -180,8 +200,43 @@ class KafkaConnection {
topics_.insert(std::make_pair(topicName, topic));
}
+ static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf) {
+ std::shared_ptr<logging::Logger> logger;
+ try {
+ modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
+ logger = loggers.at(rk).lock();
+ });
+ } catch (...) {
+ }
+
+ if (!logger) {
+ return;
+ }
+
+ switch (level) {
+ case 0: // LOG_EMERG
+ case 1: // LOG_ALERT
+ case 2: // LOG_CRIT
+ case 3: // LOG_ERR
+ logging::LOG_ERROR(logger) << buf;
+ break;
+ case 4: // LOG_WARNING
+ logging::LOG_WARN(logger) << buf;
+ break;
+ case 5: // LOG_NOTICE
+ case 6: // LOG_INFO
+ logging::LOG_INFO(logger) << buf;
+ break;
+ case 7: // LOG_DEBUG
+ logging::LOG_DEBUG(logger) << buf;
+ break;
+ }
+ }
+
private:
+ std::shared_ptr<logging::Logger> logger_;
+
std::mutex mutex_;
std::atomic<bool> lease_;
@@ -190,10 +245,18 @@ class KafkaConnection {
KafkaConnectionKey key_;
- std::map<std::string, std::shared_ptr<KafkaTopic> > topics_;
+ std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
rd_kafka_conf_t *conf_;
rd_kafka_t *kafka_connection_;
+
+ static void modifyLoggers(const std::function<void(std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
+ static std::mutex loggers_mutex;
+ static std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>> loggers;
+
+ std::lock_guard<std::mutex> lock(loggers_mutex);
+ func(loggers);
+ }
};
class KafkaPool {