You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/02/19 11:15:22 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-730: Kerberos support for PublishKafka and underlying librdkafka library

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 745d0db  MINIFICPP-730: Kerberos support for PublishKafka and underlying librdkafka library
745d0db is described below

commit 745d0dbe5c6bae442d31639f9df8d6198d8ed289
Author: Jeremy Dyer <je...@apache.org>
AuthorDate: Sat Feb 9 18:53:15 2019 -0500

    MINIFICPP-730: Kerberos support for PublishKafka and underlying librdkafka library
    
    Updates for the Dynmaic Properties that can be set and make sure they are passed to librdkafka configuration
    
    MINIFICPP-731: Kerberos support for PublishKafka
    
    This closes #485.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 extensions/librdkafka/PublishKafka.cpp | 60 ++++++++++++++++++++++++++++++----
 extensions/librdkafka/PublishKafka.h   |  5 +++
 2 files changed, 59 insertions(+), 6 deletions(-)

diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 656eb32..1b30eb8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -53,6 +53,11 @@ core::Property PublishKafka::SecurityCA("Security CA", "File or directory path t
 core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
 core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
 core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
+core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
+core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
+core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path", "The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.", "");
+core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
+                                                                  "Supports Expression Language: true (will be evaluated using flow file attributes)", "");
 core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
 core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
 
@@ -77,6 +82,10 @@ void PublishKafka::initialize() {
   properties.insert(SecurityCert);
   properties.insert(SecurityPrivateKey);
   properties.insert(SecurityPrivateKeyPassWord);
+  properties.insert(KerberosServiceName);
+  properties.insert(KerberosPrincipal);
+  properties.insert(KerberosKeytabPath);
+  properties.insert(MessageKeyField);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -95,6 +104,29 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
   conf_ = rd_kafka_conf_new();
   topic_conf_ = rd_kafka_topic_conf_new();
 
+
+  // Kerberos configuration
+  if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
+    result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr, sizeof(errstr));
+    logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
+    result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr, sizeof(errstr));
+    logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
+  if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
+    result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr, sizeof(errstr));
+    logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
+    if (result != RD_KAFKA_CONF_OK)
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+  }
+  value = "";
   if (context->getProperty(SeedBrokers.getName(), value) && !value.empty()) {
     result = rd_kafka_conf_set(conf_, "bootstrap.servers", value.c_str(), errstr, sizeof(errstr));
     logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
@@ -234,11 +266,25 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
     return;
   }
 
+    // Add all of the dynamic properties as librdkafka configurations
+    const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
+    logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());
+
+    for (const auto &key : dynamic_prop_keys) {
+        value = "";
+        if (context->getDynamicProperty(key, value) && !value.empty()) {
+            logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", key, value);
+            rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr, sizeof(errstr));
+        } else {
+            logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", key);
+        }
+    }
+
   rk_= rd_kafka_new(RD_KAFKA_PRODUCER, conf_,
             errstr, sizeof(errstr));
 
   if (!rk_) {
-    logger_->log_error("Failed to create kafak producer %s", errstr);
+    logger_->log_error("Failed to create Kafka producer %s", errstr);
     return;
   }
 
@@ -263,11 +309,13 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     return;
   }
 
-  std::string kafkaKey = flowFile->getUUIDStr();;
-  std::string value;
-
-  if (flowFile->getAttribute(KAFKA_KEY_ATTRIBUTE, value))
-    kafkaKey = value;
+  std::string kafkaKey;
+  kafkaKey = "";
+  if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
+    logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
+  } else {
+    kafkaKey = flowFile->getUUIDStr();
+  }
 
   PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, rkt_, rk_, flowFile, attributeNameRegex);
   session->read(flowFile, &callback);
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 46915b9..f31df86 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -96,6 +96,10 @@ public:
   static core::Property SecurityCert;
   static core::Property SecurityPrivateKey;
   static core::Property SecurityPrivateKeyPassWord;
+  static core::Property KerberosServiceName;
+  static core::Property KerberosPrincipal;
+  static core::Property KerberosKeytabPath;
+  static core::Property MessageKeyField;
 
   // Supported Relationships
   static core::Relationship Failure;
@@ -187,6 +191,7 @@ public:
    * ProcessSession objects.
    */
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+
   // OnTrigger method, implemented by NiFi PublishKafka
   virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
   }