You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/07/30 13:41:23 UTC

nifi-minifi-cpp git commit: MINIFICPP-427 - add PublishKafka headers support

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 4fe38ccdf -> 08f4a2b8c


MINIFICPP-427 - add PublishKafka headers support

This closes #360.

Signed-off-by: Marc Parisi <ph...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/08f4a2b8
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/08f4a2b8
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/08f4a2b8

Branch: refs/heads/master
Commit: 08f4a2b8c484c0fff190518fb285ba66d027b46e
Parents: 4fe38cc
Author: Dustin Rodrigues <du...@gmail.com>
Authored: Sat Jun 16 12:46:09 2018 -0400
Committer: Marc Parisi <ph...@apache.org>
Committed: Mon Jul 30 09:32:24 2018 -0400

----------------------------------------------------------------------
 PROCESSORS.md                          |  1 +
 README.md                              |  2 ++
 extensions/librdkafka/PublishKafka.cpp |  9 ++++++-
 extensions/librdkafka/PublishKafka.h   | 39 ++++++++++++++++++++++++++---
 4 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 0b79963..214d2af 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1023,6 +1023,7 @@ default values, and whether a property supports the NiFi Expression Language.
 | Request Timeout | | | The ack timeout of the producer request in milliseconds |
 | Client Name | | | Client Name to use when communicating with Kafka |
 | Batch Size | | | Maximum number of messages batched in one MessageSet |
+| Attributes to Send as Headers | | | Any attribute whose name matches the regex will be added to the Kafka messages as a Header |
 | Queue Buffering Max Time | | | Delay to wait for messages in the producer queue to accumulate before constructing message batches |
 | Queue Max Buffer Size | | | Maximum total message size sum allowed on the producer queue |
 | Queue Max Message | | | Maximum number of messages allowed on the producer queue |

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c3352d5..3da9a34 100644
--- a/README.md
+++ b/README.md
@@ -118,6 +118,8 @@ or greater is recommended.
 
 **NOTE** if bustache (ApplyTemplate) support is enabled, a recent version of a compiler supporting c++-11 must be used. GCC versions >= 6.3.1 are known to work.
 
+**NOTE** if Kafka support is enabled, a recent version of a compiler supporting C++-11 regexes must be used. GCC versions >= 4.9.x are recommended.
+
 **NOTE** if Expression Language support is enabled, FlexLexer must be in the include path and the version must be compatible with the version of flex used when generating lexer sources. Lexer source generation is automatically performed during CMake builds. To re-generate the sources, remove: 
 
  * extensions/expression-language/Parser.cpp

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/extensions/librdkafka/PublishKafka.cpp
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 5f39e03..656eb32 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -42,6 +42,7 @@ core::Property PublishKafka::MaxMessageSize("Max Request Size", "Maximum Kafka p
 core::Property PublishKafka::RequestTimeOut("Request Timeout", "The ack timeout of the producer request in milliseconds", "");
 core::Property PublishKafka::ClientName("Client Name", "Client Name to use when communicating with Kafka", "");
 core::Property PublishKafka::BatchSize("Batch Size", "Maximum number of messages batched in one MessageSet", "");
+core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
 core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", "");
 core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", "");
 core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", "");
@@ -64,6 +65,7 @@ void PublishKafka::initialize() {
   properties.insert(MaxMessageSize);
   properties.insert(RequestTimeOut);
   properties.insert(ClientName);
+  properties.insert(AttributeNameRegex);
   properties.insert(BatchSize);
   properties.insert(QueueBufferMaxTime);
   properties.insert(QueueBufferMaxSize);
@@ -122,6 +124,11 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
       logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   }
   value = "";
+  if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
+    attributeNameRegex.assign(value);
+    logger_->log_debug("PublishKafka: AttributeNameRegex %s", value);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
       valInt = valInt/1024;
       valueConf = std::to_string(valInt);
@@ -262,7 +269,7 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
   if (flowFile->getAttribute(KAFKA_KEY_ATTRIBUTE, value))
     kafkaKey = value;
 
-  PublishKafka::ReadCallback callback(flowFile->getSize(), max_seg_size_, kafkaKey, rkt_);
+  PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, rkt_, rk_, flowFile, attributeNameRegex);
   session->read(flowFile, &callback);
   if (callback.status_ < 0) {
     logger_->log_error("Failed to send flow to kafka topic %s", topic_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/extensions/librdkafka/PublishKafka.h
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 65ed849..0814dd9 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -28,6 +28,7 @@
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "rdkafka.h"
+#include <regex>
 
 namespace org {
 namespace apache {
@@ -83,6 +84,7 @@ public:
   static core::Property RequestTimeOut;
   static core::Property ClientName;
   static core::Property BatchSize;
+  static core::Property AttributeNameRegex;
   static core::Property QueueBufferMaxTime;
   static core::Property QueueBufferMaxSize;
   static core::Property QueueBufferMaxMessage;
@@ -101,12 +103,17 @@ public:
   // Nest Callback Class for read stream
   class ReadCallback: public InputStreamCallback {
   public:
-    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt) :
-        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), rkt_(rkt) {
+    ReadCallback(uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt, rd_kafka_t *rk, const std::shared_ptr<core::FlowFile> &flowFile, const std::regex &attributeNameRegex)  :
+        max_seg_size_(max_seg_size), key_(key), rkt_(rkt), rk_(rk), flowFile_(flowFile), attributeNameRegex_(attributeNameRegex) {
+      flow_size_ = flowFile_->getSize();
       status_ = 0;
       read_size_ = 0;
+      hdrs = nullptr;
     }
     ~ReadCallback() {
+      if (hdrs) {
+        rd_kafka_headers_destroy(hdrs);
+      }
     }
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
       if (flow_size_ < max_seg_size_)
@@ -115,6 +122,17 @@ public:
       buffer.reserve(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
+      rd_kafka_resp_err_t err;
+
+      for (auto kv : flowFile_->getAttributes()) {
+        if (regex_match(kv.first, attributeNameRegex_)) {
+          if (!hdrs) {
+            hdrs = rd_kafka_headers_new(8);
+          }
+          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(),  kv.second.c_str(), kv.second.size());
+        }
+      }
+
       while (read_size_ < flow_size_) {
         int readRet = stream->read(&buffer[0], max_seg_size_);
         if (readRet < 0) {
@@ -122,7 +140,17 @@ public:
           return read_size_;
         }
         if (readRet > 0) {
-          if (rd_kafka_produce(rkt_, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, &buffer[0], readRet, key_.c_str(), key_.size(), NULL) == -1) {
+          if (hdrs) {
+            rd_kafka_headers_t *hdrs_copy;
+            hdrs_copy = rd_kafka_headers_copy(hdrs);
+            err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet), RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END);
+            if (err) {
+              rd_kafka_headers_destroy(hdrs_copy);
+            }
+          } else {
+            err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END);
+          }
+          if (err) {
             status_ = -1;
             return read_size_;
           }
@@ -137,8 +165,12 @@ public:
     uint64_t max_seg_size_;
     std::string key_;
     rd_kafka_topic_t *rkt_;
+    rd_kafka_t *rk_;
+    rd_kafka_headers_t *hdrs;
+    std::shared_ptr<core::FlowFile> flowFile_;
     int status_;
     int read_size_;
+    std::regex attributeNameRegex_;
   };
 
 public:
@@ -167,6 +199,7 @@ private:
   rd_kafka_topic_t *rkt_;
   std::string topic_;
   uint64_t max_seg_size_;
+  std::regex attributeNameRegex;
 };
 
 REGISTER_RESOURCE (PublishKafka);