You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/09 12:50:38 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-733 - Redirect librdkafka log to proper Logger

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 7936e40  MINIFICPP-733 - Redirect librdkafka log to proper Logger
7936e40 is described below

commit 7936e40e872c43b1780984da35a893405fce78a2
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Mon Jul 8 13:22:26 2019 +0200

    MINIFICPP-733 - Redirect librdkafka log to proper Logger
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #607
---
 extensions/librdkafka/PublishKafka.cpp |  5 ++-
 extensions/librdkafka/PublishKafka.h   | 71 ++++++++++++++++++++++++++++++++--
 2 files changed, 71 insertions(+), 5 deletions(-)

diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index db4b84b..a1887a2 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -272,7 +272,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     }
   }
 
-// Add all of the dynamic properties as librdkafka configurations
+  // Add all of the dynamic properties as librdkafka configurations
   const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
   logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());
 
@@ -286,6 +286,9 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     }
   }
 
+  // Set the logger callback
+  rd_kafka_conf_set_log_cb(conf_, KafkaConnection::logCallback);
+
   auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr));
 
   if (!producer) {
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 726632b..8a23dee 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -27,6 +27,7 @@
 #include "core/Resource.h"
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/logging/Logger.h"
 #include "rdkafka.h"
 #include <regex>
 
@@ -68,7 +69,9 @@ class KafkaTopic {
   ~KafkaTopic() {
     if (topic_reference_) {
       rd_kafka_topic_destroy(topic_reference_);
-      topic_reference_ = 0;
+    }
+    if (topic_conf_) {
+      rd_kafka_topic_conf_destroy(topic_conf_);
     }
   }
 
@@ -114,7 +117,8 @@ class KafkaConnection {
  public:
 
   explicit KafkaConnection(const KafkaConnectionKey &key)
-      : conf_(nullptr),
+      : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
+        conf_(nullptr),
         kafka_connection_(nullptr) {
     lease_ = false;
     initialized_ = false;
@@ -127,11 +131,23 @@ class KafkaConnection {
 
   void remove() {
     topics_.clear();
+    removeConnection();
+  }
+
+  void removeConnection() {
     if (kafka_connection_) {
       rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
       rd_kafka_destroy(kafka_connection_);
-      kafka_connection_ = 0;
+      modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
+        loggers.erase(kafka_connection_);
+      });
+      kafka_connection_ = nullptr;
     }
+    if (conf_) {
+      rd_kafka_conf_destroy(conf_);
+      conf_ = nullptr;
+    }
+    initialized_ = false;
   }
 
   bool initialized() {
@@ -140,9 +156,13 @@ class KafkaConnection {
 
   void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) {
     std::lock_guard<std::mutex> lock(mutex_);
+    removeConnection();
     kafka_connection_ = producer;
     conf_ = conf;
     initialized_ = true;
+    modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
+      loggers[producer] = logger_;
+    });
   }
 
   rd_kafka_conf_t *getConf() {
@@ -180,8 +200,43 @@ class KafkaConnection {
     topics_.insert(std::make_pair(topicName, topic));
   }
 
+  static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf) {
+    std::shared_ptr<logging::Logger> logger;
+    try {
+      modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
+        logger = loggers.at(rk).lock();
+      });
+    } catch (...) {
+    }
+
+    if (!logger) {
+      return;
+    }
+
+    switch (level) {
+      case 0: // LOG_EMERG
+      case 1: // LOG_ALERT
+      case 2: // LOG_CRIT
+      case 3: // LOG_ERR
+        logging::LOG_ERROR(logger) << buf;
+        break;
+      case 4: // LOG_WARNING
+        logging::LOG_WARN(logger) << buf;
+        break;
+      case 5: // LOG_NOTICE
+      case 6: // LOG_INFO
+        logging::LOG_INFO(logger) << buf;
+        break;
+      case 7: // LOG_DEBUG
+        logging::LOG_DEBUG(logger) << buf;
+        break;
+    }
+  }
+
  private:
 
+  std::shared_ptr<logging::Logger> logger_;
+
   std::mutex mutex_;
 
   std::atomic<bool> lease_;
@@ -190,10 +245,18 @@ class KafkaConnection {
 
   KafkaConnectionKey key_;
 
-  std::map<std::string, std::shared_ptr<KafkaTopic> > topics_;
+  std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
 
   rd_kafka_conf_t *conf_;
   rd_kafka_t *kafka_connection_;
+
+  static void modifyLoggers(const std::function<void(std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
+    static std::mutex loggers_mutex;
+    static std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>> loggers;
+
+    std::lock_guard<std::mutex> lock(loggers_mutex);
+    func(loggers);
+  }
 };
 
 class KafkaPool {