You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/31 21:13:09 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new cd0be87  MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka
cd0be87 is described below

commit cd0be87f36cd348510dd614eb786dac45b394592
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Wed Jul 17 11:23:27 2019 +0200

    MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #614
---
 extensions/librdkafka/PublishKafka.cpp | 14 +++++++++++---
 extensions/librdkafka/PublishKafka.h   |  1 +
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index a1887a2..75cb55c 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -79,6 +79,7 @@ core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
 core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
                                              "Supports Expression Language: true (will be evaluated using flow file attributes)",
                                              "");
+core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable. Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
 core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
 core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
 
@@ -107,6 +108,7 @@ void PublishKafka::initialize() {
   properties.insert(KerberosPrincipal);
   properties.insert(KerberosKeytabPath);
   properties.insert(MessageKeyField);
+  properties.insert(DebugContexts);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -129,9 +131,16 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
 
   auto key = conn->getKey();
 
+  if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
+    rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr));
+    logger_->log_debug("PublishKafka: debug properties [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure debug properties error result [%s]", errstr);
+  }
+
   if (!key->brokers_.empty()) {
     result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr));
-    logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
+    logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_.c_str());
     if (result != RD_KAFKA_CONF_OK)
       logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   } else {
@@ -141,7 +150,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
 
   if (!key->client_id_.empty()) {
     rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr));
-    logger_->log_debug("PublishKafka: client.id [%s]", value);
+    logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_.c_str());
     if (result != RD_KAFKA_CONF_OK)
       logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   } else {
@@ -232,7 +241,6 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
       logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr);
   }
   value = "";
-  value = "";
   if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
     if (value == SECURITY_PROTOCOL_SSL) {
       rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr));
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 8a23dee..8f6806f 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -338,6 +338,7 @@ class PublishKafka : public core::Processor {
   static core::Property KerberosPrincipal;
   static core::Property KerberosKeytabPath;
   static core::Property MessageKeyField;
+  static core::Property DebugContexts;
 
   // Supported Relationships
   static core::Relationship Failure;