You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/31 21:13:09 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-732 - Add
property to expose librdkafka "debug" values for PublishKafka
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new cd0be87 MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka
cd0be87 is described below
commit cd0be87f36cd348510dd614eb786dac45b394592
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Wed Jul 17 11:23:27 2019 +0200
MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #614
---
extensions/librdkafka/PublishKafka.cpp | 14 +++++++++++---
extensions/librdkafka/PublishKafka.h | 1 +
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index a1887a2..75cb55c 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -79,6 +79,7 @@ core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
"Supports Expression Language: true (will be evaluated using flow file attributes)",
"");
+core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable. Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
@@ -107,6 +108,7 @@ void PublishKafka::initialize() {
properties.insert(KerberosPrincipal);
properties.insert(KerberosKeytabPath);
properties.insert(MessageKeyField);
+ properties.insert(DebugContexts);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -129,9 +131,16 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
auto key = conn->getKey();
+ if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
+ rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr));
+ logger_->log_debug("PublishKafka: debug properties [%s]", value);
+ if (result != RD_KAFKA_CONF_OK)
+ logger_->log_error("PublishKafka: configure debug properties error result [%s]", errstr);
+ }
+
if (!key->brokers_.empty()) {
result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
+ logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_.c_str());
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
@@ -141,7 +150,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
if (!key->client_id_.empty()) {
rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: client.id [%s]", value);
+ logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_.c_str());
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
@@ -232,7 +241,6 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr);
}
value = "";
- value = "";
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == SECURITY_PROTOCOL_SSL) {
rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr));
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 8a23dee..8f6806f 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -338,6 +338,7 @@ class PublishKafka : public core::Processor {
static core::Property KerberosPrincipal;
static core::Property KerberosKeytabPath;
static core::Property MessageKeyField;
+ static core::Property DebugContexts;
// Supported Relationships
static core::Relationship Failure;