You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/02/19 11:15:22 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-730: Kerberos
support for PublishKafka and underlying librdkafka library
This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 745d0db MINIFICPP-730: Kerberos support for PublishKafka and underlying librdkafka library
745d0db is described below
commit 745d0dbe5c6bae442d31639f9df8d6198d8ed289
Author: Jeremy Dyer <je...@apache.org>
AuthorDate: Sat Feb 9 18:53:15 2019 -0500
MINIFICPP-730: Kerberos support for PublishKafka and underlying librdkafka library
Updates for the Dynmaic Properties that can be set and make sure they are passed to librdkafka configuration
MINIFICPP-731: Kerberos support for PublishKafka
This closes #485.
Signed-off-by: Marc Parisi <ph...@apache.org>
---
extensions/librdkafka/PublishKafka.cpp | 60 ++++++++++++++++++++++++++++++----
extensions/librdkafka/PublishKafka.h | 5 +++
2 files changed, 59 insertions(+), 6 deletions(-)
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 656eb32..1b30eb8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -53,6 +53,11 @@ core::Property PublishKafka::SecurityCA("Security CA", "File or directory path t
core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
+core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
+core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
+core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path", "The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.", "");
+core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
+ "Supports Expression Language: true (will be evaluated using flow file attributes)", "");
core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
@@ -77,6 +82,10 @@ void PublishKafka::initialize() {
properties.insert(SecurityCert);
properties.insert(SecurityPrivateKey);
properties.insert(SecurityPrivateKeyPassWord);
+ properties.insert(KerberosServiceName);
+ properties.insert(KerberosPrincipal);
+ properties.insert(KerberosKeytabPath);
+ properties.insert(MessageKeyField);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -95,6 +104,29 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
conf_ = rd_kafka_conf_new();
topic_conf_ = rd_kafka_topic_conf_new();
+
+ // Kerberos configuration
+ if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr, sizeof(errstr));
+ logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
+ if (result != RD_KAFKA_CONF_OK)
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ }
+ value = "";
+ if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr, sizeof(errstr));
+ logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
+ if (result != RD_KAFKA_CONF_OK)
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ }
+ value = "";
+ if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr, sizeof(errstr));
+ logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
+ if (result != RD_KAFKA_CONF_OK)
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ }
+ value = "";
if (context->getProperty(SeedBrokers.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_, "bootstrap.servers", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
@@ -234,11 +266,25 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
return;
}
+ // Add all of the dynamic properties as librdkafka configurations
+ const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
+ logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());
+
+ for (const auto &key : dynamic_prop_keys) {
+ value = "";
+ if (context->getDynamicProperty(key, value) && !value.empty()) {
+ logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", key, value);
+ rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr, sizeof(errstr));
+ } else {
+ logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", key);
+ }
+ }
+
rk_= rd_kafka_new(RD_KAFKA_PRODUCER, conf_,
errstr, sizeof(errstr));
if (!rk_) {
- logger_->log_error("Failed to create kafak producer %s", errstr);
+ logger_->log_error("Failed to create Kafka producer %s", errstr);
return;
}
@@ -263,11 +309,13 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
return;
}
- std::string kafkaKey = flowFile->getUUIDStr();;
- std::string value;
-
- if (flowFile->getAttribute(KAFKA_KEY_ATTRIBUTE, value))
- kafkaKey = value;
+ std::string kafkaKey;
+ kafkaKey = "";
+ if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
+ logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
+ } else {
+ kafkaKey = flowFile->getUUIDStr();
+ }
PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, rkt_, rk_, flowFile, attributeNameRegex);
session->read(flowFile, &callback);
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 46915b9..f31df86 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -96,6 +96,10 @@ public:
static core::Property SecurityCert;
static core::Property SecurityPrivateKey;
static core::Property SecurityPrivateKeyPassWord;
+ static core::Property KerberosServiceName;
+ static core::Property KerberosPrincipal;
+ static core::Property KerberosKeytabPath;
+ static core::Property MessageKeyField;
// Supported Relationships
static core::Relationship Failure;
@@ -187,6 +191,7 @@ public:
* ProcessSession objects.
*/
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+
// OnTrigger method, implemented by NiFi PublishKafka
virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
}