You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/08/25 12:12:26 UTC

[nifi-minifi-cpp] branch main updated (832c55c -> 681d0bb)

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


    from 832c55c  MINIFICPP-1335 TailFile no longer generates an empty flowfile...
     new d8eeee0  MINIFICPP-1326 improve PublishKafka logging
     new 681d0bb  MINIFICPP-1334 move time utils to a namespace

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 Windows.md                                         |   1 +
 extensions/libarchive/BinFiles.h                   |   4 +-
 extensions/librdkafka/KafkaConnection.cpp          |   2 +-
 extensions/librdkafka/KafkaConnection.h            |  20 +-
 extensions/librdkafka/PublishKafka.cpp             | 348 +++++++++++++++++++--
 extensions/librdkafka/PublishKafka.h               | 266 +---------------
 extensions/sftp/processors/ListSFTP.cpp            |   2 +-
 extensions/sftp/tests/ListSFTPTests.cpp            |   2 +-
 .../standard-processors/processors/GetFile.cpp     |   6 +-
 .../processors/LogAttribute.cpp                    |   4 +-
 libminifi/include/core/FlowFile.h                  |   2 +-
 libminifi/include/core/Processor.h                 |   8 +-
 libminifi/include/core/Property.h                  |   2 +-
 libminifi/include/provenance/Provenance.h          |   2 +-
 libminifi/include/sitetosite/Peer.h                |  12 +-
 libminifi/include/utils/TimeUtil.h                 |  31 +-
 libminifi/src/Connection.cpp                       |   2 +-
 libminifi/src/FlowFileRecord.cpp                   |   2 +-
 libminifi/src/core/FlowFile.cpp                    |   2 +-
 libminifi/src/core/ProcessSession.cpp              |  24 +-
 libminifi/src/provenance/Provenance.cpp            |   2 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      |  12 +-
 libminifi/test/unit/FileUtilsTests.cpp             |   6 +-
 libminifi/test/unit/TimeUtilTests.cpp              |   3 +
 win_build_vs.bat                                   |   3 +
 25 files changed, 428 insertions(+), 340 deletions(-)


[nifi-minifi-cpp] 02/02: MINIFICPP-1334 move time utils to a namespace

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 681d0bba1ef80a2b0e0bb4ea09802952dda25994
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Sun Aug 16 20:16:53 2020 +0200

    MINIFICPP-1334 move time utils to a namespace
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #873
---
 extensions/libarchive/BinFiles.h                   |  4 +--
 extensions/sftp/processors/ListSFTP.cpp            |  2 +-
 extensions/sftp/tests/ListSFTPTests.cpp            |  2 +-
 .../standard-processors/processors/GetFile.cpp     |  6 ++---
 .../processors/LogAttribute.cpp                    |  4 +--
 libminifi/include/core/FlowFile.h                  |  2 +-
 libminifi/include/core/Processor.h                 |  8 +++---
 libminifi/include/core/Property.h                  |  2 +-
 libminifi/include/provenance/Provenance.h          |  2 +-
 libminifi/include/sitetosite/Peer.h                | 12 ++++-----
 libminifi/include/utils/TimeUtil.h                 | 31 ++++++++++++++++++----
 libminifi/src/Connection.cpp                       |  2 +-
 libminifi/src/FlowFileRecord.cpp                   |  2 +-
 libminifi/src/core/FlowFile.cpp                    |  2 +-
 libminifi/src/core/ProcessSession.cpp              | 24 ++++++++---------
 libminifi/src/provenance/Provenance.cpp            |  2 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 12 ++++-----
 libminifi/test/unit/FileUtilsTests.cpp             |  6 ++---
 libminifi/test/unit/TimeUtilTests.cpp              |  3 +++
 19 files changed, 76 insertions(+), 52 deletions(-)

diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index c52bc34..2dd16c8 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -54,7 +54,7 @@ class Bin {
         groupId_(groupId),
         logger_(logging::LoggerFactory<Bin>::getLogger()) {
     queued_data_size_ = 0;
-    creation_dated_ = getTimeMillis();
+    creation_dated_ = utils::timeutils::getTimeMillis();
     std::shared_ptr<utils::IdGenerator> id_generator = utils::IdGenerator::getIdGenerator();
     id_generator->generate(uuid_);
     uuid_str_ = uuid_.to_string();
@@ -76,7 +76,7 @@ class Bin {
   }
   // check whether the bin is older than the time specified in msec
   bool isOlderThan(const uint64_t &duration) {
-    uint64_t currentTime = getTimeMillis();
+    uint64_t currentTime = utils::timeutils::getTimeMillis();
     if (currentTime > (creation_dated_ + duration))
       return true;
     else
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
index ad0d187..985f3f4 100644
--- a/extensions/sftp/processors/ListSFTP.cpp
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -429,7 +429,7 @@ bool ListSFTP::createAndTransferFlowFileFromChild(
     return true;
   }
   std::string mtime_str;
-  if (!getDateTimeStr(static_cast<int64_t>(child.attrs.mtime), mtime_str)) {
+  if (!utils::timeutils::getDateTimeStr(static_cast<int64_t>(child.attrs.mtime), mtime_str)) {
     logger_->log_error("Failed to convert modification date %lu of \"%s/%s\" to string", child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
     return true;
   }
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
index 74e06cb..72eaeaa 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -230,7 +230,7 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file writes attributes
   auto file = std::string(src_dir) + "/vfs/nifi_test/tstFile.ext";
   auto mtime = utils::file::FileUtils::last_write_time(file);
   std::string mtime_str;
-  REQUIRE(true == getDateTimeStr(mtime, mtime_str));
+  REQUIRE(true == utils::timeutils::getDateTimeStr(mtime, mtime_str));
   uint64_t uid, gid;
   REQUIRE(true == utils::file::FileUtils::get_uid_gid(file, uid, gid));
   uint32_t permissions;
diff --git a/extensions/standard-processors/processors/GetFile.cpp b/extensions/standard-processors/processors/GetFile.cpp
index eee4eb0..eb8ccb4 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -164,9 +164,9 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
   const bool isDirEmptyBeforePoll = isListingEmpty();
   logger_->log_debug("Is listing empty before polling directory %i", isDirEmptyBeforePoll);
   if (isDirEmptyBeforePoll) {
-    if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
+    if (request_.pollInterval == 0 || (utils::timeutils::getTimeMillis() - last_listing_time_) > request_.pollInterval) {
       performListing(request_);
-      last_listing_time_.store(getTimeMillis());
+      last_listing_time_.store(utils::timeutils::getTimeMillis());
     }
   }
 
@@ -238,7 +238,7 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe
       return false;
 
     uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
-    uint64_t fileAge = getTimeMillis() - modifiedTime;
+    uint64_t fileAge = utils::timeutils::getTimeMillis() - modifiedTime;
     if (request.minAge > 0 && fileAge < request.minAge)
       return false;
     if (request.maxAge > 0 && fileAge > request.maxAge)
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index ec6539d..3f77141 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -127,8 +127,8 @@ void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     message << dashLine;
     message << "\nStandard FlowFile Attributes";
     message << "\n" << "UUID:" << flow->getUUIDStr();
-    message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
-    message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate());
+    message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow->getEntryDate());
+    message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow->getlineageStartDate());
     message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
     message << "\nFlowFile Attributes Map Content";
     std::map<std::string, std::string> attrs = flow->getAttributes();
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index ce64873..a546953 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -286,7 +286,7 @@ class FlowFile : public core::Connectable, public ReferenceContainer {
 
   // Check whether it is still being penalized
   bool isPenalized() const {
-    return (penaltyExpiration_ms_ > 0 ? penaltyExpiration_ms_ > getTimeMillis() : false);
+    return penaltyExpiration_ms_ > 0 && penaltyExpiration_ms_ > utils::timeutils::getTimeMillis();
   }
 
   uint64_t getId() const {
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 78ec5ec..4245fe6 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -185,16 +185,16 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   }
   // Yield based on the yield period
   void yield() override {
-    yield_expiration_ = (getTimeMillis() + yield_period_msec_);
+    yield_expiration_ = (utils::timeutils::getTimeMillis() + yield_period_msec_);
   }
   // Yield based on the input time
   void yield(uint64_t time) {
-    yield_expiration_ = (getTimeMillis() + time);
+    yield_expiration_ = (utils::timeutils::getTimeMillis() + time);
   }
   // whether need be to yield
   bool isYield() {
     if (yield_expiration_ > 0)
-      return (yield_expiration_ >= getTimeMillis());
+      return (yield_expiration_ >= utils::timeutils::getTimeMillis());
     else
       return false;
   }
@@ -204,7 +204,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
   }
   // get yield time
   uint64_t getYieldTime() {
-    uint64_t curTime = getTimeMillis();
+    uint64_t curTime = utils::timeutils::getTimeMillis();
     if (yield_expiration_ > curTime)
       return (yield_expiration_ - curTime);
     else
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index 6e505fb..d832da4 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -258,7 +258,7 @@ class Property {
   }
 
   static bool StringToDateTime(const std::string& input, int64_t& output) {
-    int64_t temp = parseDateTimeStr(input);
+    int64_t temp = utils::timeutils::parseDateTimeStr(input);
     if (temp == -1) {
       return false;
     }
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index f432121..41670e7 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -165,7 +165,7 @@ class ProvenanceEventRecord : public core::SerializableComponent {
 
   ProvenanceEventRecord()
       : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) {
-    _eventTime = getTimeMillis();
+    _eventTime = utils::timeutils::getTimeMillis();
   }
 
   // Destructor
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index 4700a58..b9b243f 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -199,7 +199,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
   }
   // Yield based on the yield period
   void yield() {
-    yield_expiration_ = (getTimeMillis() + yield_period_msec_);
+    yield_expiration_ = (utils::timeutils::getTimeMillis() + yield_period_msec_);
   }
   // setHostName
   void setHostName(std::string host_) {
@@ -221,12 +221,12 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
   }
   // Yield based on the input time
   void yield(uint64_t time) {
-    yield_expiration_ = (getTimeMillis() + time);
+    yield_expiration_ = (utils::timeutils::getTimeMillis() + time);
   }
   // whether need be to yield
   bool isYield() {
     if (yield_expiration_ > 0)
-      return (yield_expiration_ >= getTimeMillis());
+      return (yield_expiration_ >= utils::timeutils::getTimeMillis());
     else
       return false;
   }
@@ -237,13 +237,13 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
   // Yield based on the yield period
   void yield(std::string portId) {
     std::lock_guard<std::mutex> lock(mutex_);
-    uint64_t yieldExpiration = (getTimeMillis() + yield_period_msec_);
+    uint64_t yieldExpiration = (utils::timeutils::getTimeMillis() + yield_period_msec_);
     yield_expiration_PortIdMap[portId] = yieldExpiration;
   }
   // Yield based on the input time
   void yield(std::string portId, uint64_t time) {
     std::lock_guard<std::mutex> lock(mutex_);
-    uint64_t yieldExpiration = (getTimeMillis() + time);
+    uint64_t yieldExpiration = (utils::timeutils::getTimeMillis() + time);
     yield_expiration_PortIdMap[portId] = yieldExpiration;
   }
   // whether need be to yield
@@ -252,7 +252,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
     std::map<std::string, uint64_t>::iterator it = this->yield_expiration_PortIdMap.find(portId);
     if (it != yield_expiration_PortIdMap.end()) {
       uint64_t yieldExpiration = it->second;
-      return (yieldExpiration >= getTimeMillis());
+      return (yieldExpiration >= utils::timeutils::getTimeMillis());
     } else {
       return false;
     }
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index b8d1693..833b096 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -17,8 +17,8 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_TIMEUTIL_H_
 #define LIBMINIFI_INCLUDE_UTILS_TIMEUTIL_H_
 
-#include <string.h>
-#include <time.h>
+#include <cstring>
+#include <ctime>
 
 #include <array>
 #include <chrono>
@@ -30,6 +30,13 @@
 
 #define TIME_FORMAT "%Y-%m-%d %H:%M:%S"
 
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace timeutils {
+
 /**
  * Gets the current time in milliseconds
  * @returns milliseconds since epoch
@@ -66,11 +73,11 @@ inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false) {
   return ret;
 }
 
-inline time_t mkgmtime(struct tm *date_time) {
+inline time_t mkgmtime(struct tm* date_time) {
 #ifdef WIN32
   return _mkgmtime(date_time);
 #else
-  static const int month_lengths[] =      {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
+  static const int month_lengths[] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
   static const int month_lengths_leap[] = {31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
   static const auto is_leap_year = [](int year) -> bool {
     return year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
@@ -107,7 +114,7 @@ inline time_t mkgmtime(struct tm *date_time) {
  * @param str the datetime string
  * @returns Unix timestamp
  */
-inline int64_t parseDateTimeStr(const std::string &str) {
+inline int64_t parseDateTimeStr(const std::string& str) {
   /**
    * There is no strptime on Windows. As long as we have to parse a single date format this is not so bad,
    * but if multiple formats will have to be supported in the future, it might be worth it to include
@@ -167,4 +174,18 @@ inline bool getDateTimeStr(int64_t unix_timestamp, std::string& date_time_str) {
   return true;
 }
 
+} /* namespace timeutils */
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+// for backwards compatibility, to be removed after 0.8
+using org::apache::nifi::minifi::utils::timeutils::getTimeMillis;
+using org::apache::nifi::minifi::utils::timeutils::getTimeNano;
+using org::apache::nifi::minifi::utils::timeutils::getTimeStr;
+using org::apache::nifi::minifi::utils::timeutils::parseDateTimeStr;
+using org::apache::nifi::minifi::utils::timeutils::getDateTimeStr;
+
 #endif  // LIBMINIFI_INCLUDE_UTILS_TIMEUTIL_H_
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 334d960..c009740 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -188,7 +188,7 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
 
     if (expired_duration_ > 0) {
       // We need to check for flow expiration
-      if (getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+      if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
         // Flow record expired
         expiredFlowRecords.insert(item);
         logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 0d66412..358b589 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -50,7 +50,7 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
   // Increase the local ID for the flow record
   ++local_flow_seq_number_;
   // Populate the default attributes
-  addKeyedAttribute(FILENAME, std::to_string(getTimeNano()));
+  addKeyedAttribute(FILENAME, std::to_string(utils::timeutils::getTimeNano()));
   addKeyedAttribute(PATH, DEFAULT_FLOWFILE_PATH);
   addKeyedAttribute(UUID, getUUIDStr());
   // Populate the attributes from the input
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index f94f2c9..f5765f8 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -46,7 +46,7 @@ FlowFile::FlowFile()
       connection_(nullptr),
       original_connection_() {
   id_ = numeric_id_generator_->generateId();
-  entry_date_ = getTimeMillis();
+  entry_date_ = utils::timeutils::getTimeMillis();
   event_time_ = entry_date_;
   lineage_start_date_ = entry_date_;
 }
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 4266d24..df67f62 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -231,7 +231,7 @@ void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> &flow
 void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
   uint64_t penalization_period = process_context_->getProcessorNode()->getPenalizationPeriodMsec();
   logging::LOG_INFO(logger_) << "Penalizing " << flow->getUUIDStr() << " for " << penalization_period << "ms at " << process_context_->getProcessorNode()->getName();
-  flow->setPenaltyExpiration(getTimeMillis() + penalization_period);
+  flow->setPenaltyExpiration(utils::timeutils::getTimeMillis() + penalization_period);
 }
 
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
@@ -244,7 +244,7 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
 
   try {
-    uint64_t startTime = getTimeMillis();
+    uint64_t startTime = utils::timeutils::getTimeMillis();
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
     // Call the callback to write the content
     if (nullptr == stream) {
@@ -262,7 +262,7 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
 
     stream->closeStream();
     std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow->getUUIDStr();
-    uint64_t endTime = getTimeMillis();
+    uint64_t endTime = utils::timeutils::getTimeMillis();
     provenance_report_->modifyContent(flow, details, endTime - startTime);
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
@@ -281,7 +281,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
   }
 
   try {
-    uint64_t startTime = getTimeMillis();
+    uint64_t startTime = utils::timeutils::getTimeMillis();
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim, true);
     if (nullptr == stream) {
       rollback();
@@ -301,7 +301,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
 
     std::stringstream details;
     details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
-    uint64_t endTime = getTimeMillis();
+    uint64_t endTime = utils::timeutils::getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
@@ -361,7 +361,7 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<co
   std::vector<uint8_t> charBuffer(max_read);
 
   try {
-    auto startTime = getTimeMillis();
+    auto startTime = utils::timeutils::getTimeMillis();
     std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
 
     if (nullptr == content_stream) {
@@ -390,7 +390,7 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<co
     content_stream->closeStream();
     std::stringstream details;
     details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
-    auto endTime = getTimeMillis();
+    auto endTime = utils::timeutils::getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
@@ -407,7 +407,7 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
   std::vector<uint8_t> charBuffer(size);
 
   try {
-    auto startTime = getTimeMillis();
+    auto startTime = utils::timeutils::getTimeMillis();
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
@@ -454,7 +454,7 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
           std::remove(source.c_str());
         std::stringstream details;
         details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
-        auto endTime = getTimeMillis();
+        auto endTime = utils::timeutils::getTimeMillis();
         provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
       } else {
         stream->closeStream();
@@ -508,7 +508,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
       uint8_t* begin = buffer.data();
       uint8_t* end = begin + read;
       while (true) {
-        startTime = getTimeMillis();
+        startTime = utils::timeutils::getTimeMillis();
         uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
         const auto len = gsl::narrow<int>(delimiterPos - begin);
 
@@ -524,7 +524,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
         /* Create claim and stream if needed and append data */
         if (claim == nullptr) {
-          startTime = getTimeMillis();
+          startTime = utils::timeutils::getTimeMillis();
           claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
         }
         if (stream == nullptr) {
@@ -553,7 +553,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
                                     << ", FlowFile UUID " << flowFile->getUUIDStr();
         stream->closeStream();
         std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-        uint64_t endTime = getTimeMillis();
+        uint64_t endTime = utils::timeutils::getTimeMillis();
         provenance_report_->modifyContent(flowFile, details, endTime - startTime);
         flows.push_back(flowFile);
 
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 16126d6..2578a4b 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -49,7 +49,7 @@ ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEv
   _eventType = event;
   _componentId = componentId;
   _componentType = componentType;
-  _eventTime = getTimeMillis();
+  _eventTime = utils::timeutils::getTimeMillis();
 }
 
 // DeSerialize
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 40af602..0099c31 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -138,11 +138,11 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
   }
 
   bool continueTransaction = true;
-  uint64_t startSendingNanos = getTimeNano();
+  uint64_t startSendingNanos = utils::timeutils::getTimeNano();
 
   try {
     while (continueTransaction) {
-      uint64_t startTime = getTimeMillis();
+      uint64_t startTime = utils::timeutils::getTimeMillis();
       std::string payload;
       DataPacket packet(getLogger(), transaction, flow->getAttributes(), payload);
 
@@ -153,14 +153,14 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
 
       logger_->log_debug("Site2Site transaction %s send flow record %s", transactionID, flow->getUUIDStr());
       if (resp == 0) {
-        uint64_t endTime = getTimeMillis();
+        uint64_t endTime = utils::timeutils::getTimeMillis();
         std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
         std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
         session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
       }
       session->remove(flow);
 
-      uint64_t transferNanos = getTimeNano() - startSendingNanos;
+      uint64_t transferNanos = utils::timeutils::getTimeNano() - startSendingNanos;
       if (transferNanos > _batchSendNanos)
         break;
 
@@ -668,7 +668,7 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessConte
   try {
     while (true) {
       std::map<std::string, std::string> empty;
-      uint64_t startTime = getTimeMillis();
+      uint64_t startTime = utils::timeutils::getTimeMillis();
       std::string payload;
       DataPacket packet(getLogger(), transaction, empty, payload);
       bool eof = false;
@@ -705,7 +705,7 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessConte
         }
       }
       core::Relationship relation;  // undefined relationship
-      uint64_t endTime = getTimeMillis();
+      uint64_t endTime = utils::timeutils::getTimeMillis();
       std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
       std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName();
       session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime);
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 1cd4f82..6ead1a8 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -183,7 +183,7 @@ TEST_CASE("TestFileUtils::getFullPath", "[TestGetFullPath]") {
 TEST_CASE("FileUtils::last_write_time and last_write_time_point work", "[last_write_time][last_write_time_point]") {
   using namespace std::chrono;
 
-  uint64_t time_before_write = getTimeMillis() / 1000;
+  uint64_t time_before_write = utils::timeutils::getTimeMillis() / 1000;
   time_point<system_clock, seconds> time_point_before_write = time_point_cast<seconds>(system_clock::now());
 
   TestController testController;
@@ -199,7 +199,7 @@ TEST_CASE("FileUtils::last_write_time and last_write_time_point work", "[last_wr
   test_file_stream << "foo\n";
   test_file_stream.flush();
 
-  uint64_t time_after_first_write = getTimeMillis() / 1000;
+  uint64_t time_after_first_write = utils::timeutils::getTimeMillis() / 1000;
   time_point<system_clock, seconds> time_point_after_first_write = time_point_cast<seconds>(system_clock::now());
 
   uint64_t first_mtime = FileUtils::last_write_time(test_file);
@@ -213,7 +213,7 @@ TEST_CASE("FileUtils::last_write_time and last_write_time_point work", "[last_wr
   test_file_stream << "bar\n";
   test_file_stream.flush();
 
-  uint64_t time_after_second_write = getTimeMillis() / 1000;
+  uint64_t time_after_second_write = utils::timeutils::getTimeMillis() / 1000;
   time_point<system_clock, seconds> time_point_after_second_write = time_point_cast<seconds>(system_clock::now());
 
   uint64_t second_mtime = FileUtils::last_write_time(test_file);
diff --git a/libminifi/test/unit/TimeUtilTests.cpp b/libminifi/test/unit/TimeUtilTests.cpp
index d52535e..7c5974d 100644
--- a/libminifi/test/unit/TimeUtilTests.cpp
+++ b/libminifi/test/unit/TimeUtilTests.cpp
@@ -35,6 +35,7 @@ namespace {
   }
 
   void mkgmtimeTestHelper(time_t expected, int year, int month, int day, int hour, int minute, int second) {
+    using org::apache::nifi::minifi::utils::timeutils::mkgmtime;
     struct tm date_time = createTm(year, month, day, hour, minute, second);
     REQUIRE(mkgmtime(&date_time) == expected);
   }
@@ -65,6 +66,7 @@ TEST_CASE("mkgmtime() works correctly", "[mkgmtime]") {
 }
 
 TEST_CASE("parseDateTimeStr() works correctly", "[parseDateTimeStr]") {
+  using org::apache::nifi::minifi::utils::timeutils::parseDateTimeStr;
   REQUIRE(parseDateTimeStr("1970-01-01T00:00:00Z") == 0);
   REQUIRE(parseDateTimeStr("1970-01-01T00:59:59Z") == ONE_HOUR - 1);
 
@@ -87,5 +89,6 @@ TEST_CASE("parseDateTimeStr() works correctly", "[parseDateTimeStr]") {
 }
 
 TEST_CASE("Test time conversion", "[testtimeconversion]") {
+  using org::apache::nifi::minifi::utils::timeutils::getTimeStr;
   REQUIRE("2017-02-16 20:14:56.196" == getTimeStr(1487276096196, true));
 }


[nifi-minifi-cpp] 01/02: MINIFICPP-1326 improve PublishKafka logging

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d8eeee0de7fa74adfefcdeeb58f532e788d9c157
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Tue Aug 11 03:50:17 2020 +0200

    MINIFICPP-1326 improve PublishKafka logging
    
    ..., move implementation details to the .cpp file, and
    other small refactors and improvements
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #868
---
 Windows.md                                |   1 +
 extensions/librdkafka/KafkaConnection.cpp |   2 +-
 extensions/librdkafka/KafkaConnection.h   |  20 +-
 extensions/librdkafka/PublishKafka.cpp    | 348 +++++++++++++++++++++++++++---
 extensions/librdkafka/PublishKafka.h      | 266 ++---------------------
 win_build_vs.bat                          |   3 +
 6 files changed, 352 insertions(+), 288 deletions(-)

diff --git a/Windows.md b/Windows.md
index d453955..c2400be 100644
--- a/Windows.md
+++ b/Windows.md
@@ -62,6 +62,7 @@ After the build directory it will take optional parameters modifying the CMake c
 | /J | Enables JNI |
 | /64 | Creates 64-bit build instead of a 32-bit one |
 | /D | Builds RelWithDebInfo build instead of Release |
+| /DD | Builds Debug build instead of Release |
 | /CI | Sets STRICT_GSL_CHECKS to AUDIT |
 
 Examples:
diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp
index cf871a8..3d412ad 100644
--- a/extensions/librdkafka/KafkaConnection.cpp
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -87,7 +87,7 @@ std::shared_ptr<KafkaTopic> KafkaConnection::getTopic(const std::string &topic)
   return nullptr;
 }
 
-KafkaConnectionKey const * const KafkaConnection::getKey() const {
+KafkaConnectionKey const* KafkaConnection::getKey() const {
   return &key_;
 }
 
diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h
index 816b40a..8d5b12e 100644
--- a/extensions/librdkafka/KafkaConnection.h
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -37,14 +37,16 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-class KafkaConnectionKey {
- public:
-    std::string client_id_;
-    std::string brokers_;
-
-    bool operator <(const KafkaConnectionKey& rhs) const {
-      return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_, rhs.client_id_);
-    }
+struct KafkaConnectionKey {
+  std::string client_id_;
+  std::string brokers_;
+
+  bool operator< (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) <  std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator<=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) <= std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator==(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) == std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator!=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) != std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator> (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) >  std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator>=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) >= std::tie(rhs.brokers_, rhs.client_id_); }
 };
 
 class KafkaConnection {
@@ -70,7 +72,7 @@ class KafkaConnection {
 
   std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) const;
 
-  KafkaConnectionKey const * const getKey() const;
+  KafkaConnectionKey const* getKey() const;
 
   void putTopic(const std::string &topicName, const std::shared_ptr<KafkaTopic> &topic);
 
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index fc5ee56..1b92edc 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -39,6 +39,21 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+#define COMPRESSION_CODEC_NONE "none"
+#define COMPRESSION_CODEC_GZIP "gzip"
+#define COMPRESSION_CODEC_SNAPPY "snappy"
+#define ROUND_ROBIN_PARTITIONING "Round Robin"
+#define RANDOM_PARTITIONING "Random Robin"
+#define USER_DEFINED_PARTITIONING "User-Defined"
+#define DELIVERY_REPLICATED "all"
+#define DELIVERY_ONE_NODE "1"
+#define DELIVERY_BEST_EFFORT "0"
+#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
+#define SECURITY_PROTOCOL_SSL "ssl"
+#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
+#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
+#define KAFKA_KEY_ATTRIBUTE "kafka.key"
+
 const core::Property PublishKafka::SeedBrokers(
     core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
@@ -144,6 +159,305 @@ struct rd_kafka_conf_deleter {
 struct rd_kafka_topic_conf_deleter {
   void operator()(rd_kafka_topic_conf_t* p) const noexcept { rd_kafka_topic_conf_destroy(p); }
 };
+
+// Message
+enum class MessageStatus : uint8_t {
+  InFlight,
+  Error,
+  Success
+};
+
+const char* to_string(const MessageStatus s) {
+  switch (s) {
+    case MessageStatus::InFlight: return "InFlight";
+    case MessageStatus::Error: return "Error";
+    case MessageStatus::Success: return "Success";
+  }
+  throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable code"};
+}
+
+struct MessageResult {
+  MessageStatus status = MessageStatus::InFlight;
+  rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+  bool flow_file_error = false;
+  std::vector<MessageResult> messages;
+};
+}  // namespace
+
+class PublishKafka::Messages {
+  std::mutex mutex_;
+  std::condition_variable cv_;
+  std::vector<FlowFileResult> flow_files_;
+  bool interrupted_ = false;
+  const std::shared_ptr<logging::Logger> logger_;
+
+  std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+    gsl_Expects(lock.owns_lock());
+    const auto messageresult_ok = [](const MessageResult r) { return r.status == MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+    const auto messageresult_inflight = [](const MessageResult r) { return r.status == MessageStatus::InFlight && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+    std::vector<size_t> flow_files_in_flight;
+    std::ostringstream oss;
+    if (interrupted_) { oss << "interrupted, "; }
+    for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+      const auto& flow_file = flow_files_[ffi];
+      if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_ok)) {
+        continue;  // don't log the happy path to reduce log spam
+      }
+      if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_inflight)) {
+        flow_files_in_flight.push_back(ffi);
+        continue;  // don't log fully in-flight flow files here, log them at the end instead
+      }
+      oss << '[' << ffi << "]: {";
+      if (flow_file.flow_file_error) { oss << "error, "; }
+      for (size_t msgi = 0; msgi < flow_file.messages.size(); ++msgi) {
+        const auto& msg = flow_file.messages[msgi];
+        if (messageresult_ok(msg)) {
+          continue;
+        }
+        oss << '<' << msgi << ">: (msg " << to_string(msg.status) << ", " << rd_kafka_err2str(msg.err_code) << "), ";
+      }
+      oss << "}, ";
+    }
+    oss << "in-flight (" << flow_files_in_flight.size() << "): " << utils::StringUtils::join(",", flow_files_in_flight);
+    return oss.str();
+  }
+
+ public:
+  explicit Messages(std::shared_ptr<logging::Logger> logger) :logger_{std::move(logger)} {}
+
+  void waitForCompletion() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cv_.wait(lock, [this, &lock] {
+      if (logger_->should_log(logging::LOG_LEVEL::trace)) {
+        logger_->log_trace("%s", logStatus(lock));
+      }
+      return interrupted_ || std::all_of(std::begin(this->flow_files_), std::end(this->flow_files_), [](const FlowFileResult& flow_file) {
+        return flow_file.flow_file_error || std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), [](const MessageResult& message) {
+          return message.status != MessageStatus::InFlight;
+        });
+      });
+    });
+  }
+
+  template<typename Func>
+  auto modifyResult(size_t index, Func fun) -> decltype(fun(flow_files_.at(index))) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    const auto notifier = gsl::finally([this]{ cv_.notify_all(); });
+    try {
+      return fun(flow_files_.at(index));
+    } catch(const std::exception& ex) {
+      logger_->log_warn("Messages::modifyResult exception: %s", ex.what());
+      throw;
+    }
+  }
+
+  size_t addFlowFile() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    flow_files_.emplace_back();
+    return flow_files_.size() - 1;
+  }
+
+  template<typename Func>
+  auto iterateFlowFiles(Func fun) -> utils::void_t<decltype(fun(size_t{0}, flow_files_.front()))> {
+    std::lock_guard<std::mutex> lock(mutex_);
+    for (size_t index = 0U; index < flow_files_.size(); index++) {
+      fun(index, flow_files_[index]);
+    }
+  }
+
+  void interrupt() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    interrupted_ = true;
+    cv_.notify_all();
+    gsl_Ensures(interrupted_);
+  }
+
+  bool wasInterrupted() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return interrupted_;
+  }
+};
+
+namespace {
+class ReadCallback : public InputStreamCallback {
+ public:
+  struct rd_kafka_headers_deleter {
+    void operator()(rd_kafka_headers_t* ptr) const noexcept {
+      rd_kafka_headers_destroy(ptr);
+    }
+  };
+
+  using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
+
+ private:
+  void allocate_message_object(const size_t segment_num) const {
+    messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+      // allocate message object to be filled in by the callback in produce()
+      if (flow_file.messages.size() < segment_num + 1) {
+        flow_file.messages.resize(segment_num + 1);
+      }
+      gsl_Ensures(flow_file.messages.size() > segment_num);
+    });
+  }
+
+  static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
+    const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
+    if (!result) { throw std::bad_alloc{}; }
+
+    for (const auto& kv : flow_file.getAttributes()) {
+      if (attribute_name_regex.match(kv.first)) {
+        rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+      }
+    }
+    return rd_kafka_headers_unique_ptr{ result };
+  }
+
+  rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+    const std::shared_ptr<PublishKafka::Messages> messages_ptr_copy = this->messages_;
+    const auto flow_file_index_copy = this->flow_file_index_;
+    const auto logger = logger_;
+    const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num, logger](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+      messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage, logger, flow_file_index_copy](FlowFileResult &flow_file) {
+        auto &message = flow_file.messages.at(segment_num);
+        message.err_code = rkmessage->err;
+        message.status = message.err_code == 0 ? MessageStatus::Success : MessageStatus::Error;
+        if (message.err_code != RD_KAFKA_RESP_ERR_NO_ERROR) {
+          logger->log_warn("delivery callback, flow file #%zu/segment #%zu: %s", flow_file_index_copy, segment_num, rd_kafka_err2str(message.err_code));
+        } else {
+          logger->log_debug("delivery callback, flow file #%zu/segment #%zu: success", flow_file_index_copy, segment_num);
+        }
+      });
+    };
+    // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+    auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+    allocate_message_object(segment_num);
+
+    const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
+    const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+        RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
+    if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
+      // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
+      // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
+      (void)callback_ptr.release();
+    } else {
+      // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them
+      rd_kafka_headers_destroy(hdrs_copy);
+    }
+    logger_->log_trace("produce enqueued flow file #%zu/segment #%zu: %s", flow_file_index_, segment_num, rd_kafka_err2str(err));
+    return err;
+  }
+
+ public:
+  ReadCallback(const uint64_t max_seg_size,
+      std::string key,
+      rd_kafka_topic_t* const rkt,
+      rd_kafka_t* const rk,
+      const core::FlowFile& flowFile,
+      utils::Regex& attributeNameRegex,
+      std::shared_ptr<PublishKafka::Messages> messages,
+      const size_t flow_file_index,
+      const bool fail_empty_flow_files,
+      std::shared_ptr<logging::Logger> logger)
+      : flow_size_(flowFile.getSize()),
+      max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+      key_(std::move(key)),
+      rkt_(rkt),
+      rk_(rk),
+      hdrs(make_headers(flowFile, attributeNameRegex)),
+      messages_(std::move(messages)),
+      flow_file_index_(flow_file_index),
+      fail_empty_flow_files_(fail_empty_flow_files),
+      logger_(std::move(logger))
+  { }
+
+  ReadCallback(const ReadCallback&) = delete;
+  ReadCallback& operator=(ReadCallback) = delete;
+
+  int64_t process(const std::shared_ptr<io::BaseStream> stream) override {
+    std::vector<unsigned char> buffer;
+
+    buffer.resize(max_seg_size_);
+    read_size_ = 0;
+    status_ = 0;
+    called_ = true;
+
+    gsl_Expects(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
+    // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
+    const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
+    messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
+      flow_file.messages.reserve(reserved_msg_capacity);
+    });
+
+    // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
+    if (flow_size_ == 0 && !fail_empty_flow_files_) {
+      const auto err = produce(0, buffer, 0);
+      if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+        status_ = -1;
+        error_ = rd_kafka_err2str(err);
+      }
+      return 0;
+    }
+
+    for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
+      const int readRet = stream->read(buffer.data(), buffer.size());
+      if (readRet < 0) {
+        status_ = -1;
+        error_ = "Failed to read from stream";
+        return read_size_;
+      }
+
+      if (readRet <= 0) { break; }
+
+      const auto err = produce(segment_num, buffer, readRet);
+      if (err) {
+        messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
+          auto& message = flow_file.messages.at(segment_num);
+          message.status = MessageStatus::Error;
+          message.err_code = err;
+        });
+        status_ = -1;
+        error_ = rd_kafka_err2str(err);
+        return read_size_;
+      }
+      read_size_ += readRet;
+    }
+    return read_size_;
+  }
+
+  const uint64_t flow_size_ = 0;
+  const uint64_t max_seg_size_ = 0;
+  const std::string key_;
+  rd_kafka_topic_t* const rkt_ = nullptr;
+  rd_kafka_t* const rk_ = nullptr;
+  const rd_kafka_headers_unique_ptr hdrs;  // not null
+  const std::shared_ptr<PublishKafka::Messages> messages_;
+  const size_t flow_file_index_;
+  int status_ = 0;
+  std::string error_;
+  int read_size_ = 0;
+  bool called_ = false;
+  const bool fail_empty_flow_files_ = true;
+  const std::shared_ptr<logging::Logger> logger_;
+};
+
+/**
+ * Message delivery report callback using the richer rd_kafka_message_t object.
+ */
+void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
+  if (rkmessage->_private == nullptr) {
+    return;
+  }
+  // allocated in ReadCallback::produce
+  auto* const func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
+  try {
+    (*func)(rk, rkmessage);
+  } catch (...) { }
+  delete func;
+}
 }  // namespace
 
 void PublishKafka::initialize() {
@@ -235,20 +549,6 @@ void PublishKafka::notifyStop() {
   conn_.reset();
 }
 
-/**
- * Message delivery report callback using the richer rd_kafka_message_t object.
- */
-void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
-  if (rkmessage->_private == nullptr) {
-    return;
-  }
-  // allocated in PublishKafka::ReadCallback::produce
-  auto* func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
-  try {
-    (*func)(rk, rkmessage);
-  } catch (...) { }
-  delete func;
-}
 
 bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
   std::string value;
@@ -256,14 +556,14 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
   std::string valueConf;
   std::array<char, 512U> errstr{};
   rd_kafka_conf_res_t result;
-  const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
+  const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
 
   std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ rd_kafka_conf_new() };
   if (conf_ == nullptr) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
   }
 
-  auto key = conn_->getKey();
+  const auto* const key = conn_->getKey();
 
   if (key->brokers_.empty()) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
@@ -451,7 +751,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
   }
 
   // Set the delivery callback
-  rd_kafka_conf_set_dr_msg_cb(conf_.get(), &PublishKafka::messageDeliveryCallback);
+  rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback);
 
   // Set the logger callback
   rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);
@@ -579,13 +879,13 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
   }
   logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);
 
-  auto messages = std::make_shared<Messages>();
+  auto messages = std::make_shared<Messages>(logger_);
   // We must add this to the messages set, so that it will be interrupted when notifyStop is called
   {
     std::lock_guard<std::mutex> lock(messages_mutex_);
     messages_set_.emplace(messages);
   }
-  // We also have to insure that it will be removed once we are done with it
+  // We also have to ensure that it will be removed once we are done with it
   const auto messagesSetGuard = gsl::finally([&]() {
     std::lock_guard<std::mutex> lock(messages_mutex_);
     messages_set_.erase(messages);
@@ -636,8 +936,8 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     bool failEmptyFlowFiles = true;
     context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
 
-    PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
-                                        attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
+    ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
+                                        attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
     session->read(flowFile, &callback);
 
     if (!callback.called_) {
@@ -677,20 +977,20 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
       for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
         const auto& message = flow_file.messages[segment_num];
         switch (message.status) {
-          case MessageStatus::MESSAGESTATUS_UNCOMPLETE:
+          case MessageStatus::InFlight:
             success = false;
             logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
                 flowFiles[index]->getUUIDStr(),
                 segment_num);
           break;
-          case MessageStatus::MESSAGESTATUS_ERROR:
+          case MessageStatus::Error:
             success = false;
             logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
                 flowFiles[index]->getUUIDStr(),
                 segment_num,
                 rd_kafka_err2str(message.err_code));
           break;
-          case MessageStatus::MESSAGESTATUS_SUCCESS:
+          case MessageStatus::Success:
             logger_->log_debug("Successfully delivered flow file %s segment %zu",
                 flowFiles[index]->getUUIDStr(),
                 segment_num);
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 5d16717..7cc9101 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -50,36 +50,9 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-#define COMPRESSION_CODEC_NONE "none"
-#define COMPRESSION_CODEC_GZIP "gzip"
-#define COMPRESSION_CODEC_SNAPPY "snappy"
-#define ROUND_ROBIN_PARTITIONING "Round Robin"
-#define RANDOM_PARTITIONING "Random Robin"
-#define USER_DEFINED_PARTITIONING "User-Defined"
-#define DELIVERY_REPLICATED "all"
-#define DELIVERY_ONE_NODE "1"
-#define DELIVERY_BEST_EFFORT "0"
-#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define SECURITY_PROTOCOL_SSL "ssl"
-#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
-#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
-#define KAFKA_KEY_ATTRIBUTE "kafka.key"
-
 // PublishKafka Class
 class PublishKafka : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
-      : core::Processor(std::move(name), uuid),
-        logger_(logging::LoggerFactory<PublishKafka>::getLogger()),
-        interrupted_(false) {
-  }
-
-  virtual ~PublishKafka() = default;
-
   static constexpr char const* ProcessorName = "PublishKafka";
 
   // Supported Properties
@@ -114,228 +87,13 @@ class PublishKafka : public core::Processor {
   static const core::Relationship Failure;
   static const core::Relationship Success;
 
-  // Message
-  enum class MessageStatus : uint8_t {
-    MESSAGESTATUS_UNCOMPLETE,
-    MESSAGESTATUS_ERROR,
-    MESSAGESTATUS_SUCCESS
-  };
-
-  struct MessageResult {
-    MessageStatus status = MessageStatus::MESSAGESTATUS_UNCOMPLETE;
-    rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_UNKNOWN;
-  };
-
-  struct FlowFileResult {
-    bool flow_file_error = false;
-    std::vector<MessageResult> messages;
-  };
-
-  struct Messages {
-    std::mutex mutex;
-    std::condition_variable cv;
-    std::vector<FlowFileResult> flow_files;
-    bool interrupted = false;
-
-    void waitForCompletion() {
-      std::unique_lock<std::mutex> lock(mutex);
-      cv.wait(lock, [this]() -> bool {
-        if (interrupted) {
-          return true;
-        }
-        size_t index = 0U;
-        return std::all_of(this->flow_files.begin(), this->flow_files.end(), [&](const FlowFileResult& flow_file) {
-          index++;
-          if (flow_file.flow_file_error) {
-            return true;
-          }
-          return std::all_of(flow_file.messages.begin(), flow_file.messages.end(), [](const MessageResult& message) {
-            return message.status != MessageStatus::MESSAGESTATUS_UNCOMPLETE;
-          });
-        });
-      });
-    }
-
-    void modifyResult(size_t index, const std::function<void(FlowFileResult&)>& fun) {
-      std::unique_lock<std::mutex> lock(mutex);
-      fun(flow_files.at(index));
-      cv.notify_all();
-    }
-
-    size_t addFlowFile() {
-      std::lock_guard<std::mutex> lock(mutex);
-      flow_files.emplace_back();
-      return flow_files.size() - 1;
-    }
-
-    void iterateFlowFiles(const std::function<void(size_t /*index*/, const FlowFileResult& /*flow_file_result*/)>& fun) {
-      std::lock_guard<std::mutex> lock(mutex);
-      for (size_t index = 0U; index < flow_files.size(); index++) {
-        fun(index, flow_files[index]);
-      }
-    }
-
-    void interrupt() {
-      std::unique_lock<std::mutex> lock(mutex);
-      interrupted = true;
-      cv.notify_all();
-    }
-
-    bool wasInterrupted() {
-      std::lock_guard<std::mutex> lock(mutex);
-      return interrupted;
-    }
-  };
-
-  // Nest Callback Class for read stream
-  class ReadCallback : public InputStreamCallback {
-   public:
-    struct rd_kafka_headers_deleter {
-      void operator()(rd_kafka_headers_t* ptr) const noexcept {
-        rd_kafka_headers_destroy(ptr);
-      }
-    };
-
-    using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
-
-   private:
-    void allocate_message_object(const size_t segment_num) const {
-      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
-        // allocate message object to be filled in by the callback in produce()
-        if (flow_file.messages.size() < segment_num + 1) {
-          flow_file.messages.resize(segment_num + 1);
-        }
-      });
-    }
-
-    static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
-      const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
-      if (!result) { throw std::bad_alloc{}; }
-
-      for (const auto& kv : flow_file.getAttributes()) {
-        if (attribute_name_regex.match(kv.first)) {
-          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
-      }
-      return rd_kafka_headers_unique_ptr{ result };
-    }
-
-    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
-      const std::shared_ptr<Messages> messages_ptr_copy = this->messages_;
-      const auto flow_file_index_copy = this->flow_file_index_;
-      const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
-        messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
-          auto &message = flow_file.messages.at(segment_num);
-          message.err_code = rkmessage->err;
-          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
-        });
-      };
-      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
-      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
-
-      allocate_message_object(segment_num);
-
-      const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
-      const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
-                                         RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
-      if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
-        // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
-        // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
-        (void)callback_ptr.release();
-      } else {
-        // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them
-        rd_kafka_headers_destroy(hdrs_copy);
-      }
-      return err;
-    }
-
-   public:
-    ReadCallback(const uint64_t max_seg_size,
-                 std::string key,
-                 rd_kafka_topic_t * const rkt,
-                 rd_kafka_t * const rk,
-                 const core::FlowFile& flowFile,
-                 utils::Regex &attributeNameRegex,
-                 std::shared_ptr<Messages> messages,
-                 const size_t flow_file_index,
-                 const bool fail_empty_flow_files)
-        : flow_size_(flowFile.getSize()),
-          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
-          key_(std::move(key)),
-          rkt_(rkt),
-          rk_(rk),
-          hdrs(make_headers(flowFile, attributeNameRegex)),
-          messages_(std::move(messages)),
-          flow_file_index_(flow_file_index),
-          fail_empty_flow_files_(fail_empty_flow_files)
-    { }
-
-    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
-      std::vector<unsigned char> buffer;
-
-      buffer.resize(max_seg_size_);
-      read_size_ = 0;
-      status_ = 0;
-      called_ = true;
-
-      assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
-      // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
-      const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
-      messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
-        flow_file.messages.reserve(reserved_msg_capacity);
-      });
-
-      // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
-      if (flow_size_ == 0 && !fail_empty_flow_files_) {
-        produce(0, buffer, 0);
-        return 0;
-      }
-
-      for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
-        const int readRet = stream->read(buffer.data(), buffer.size());
-        if (readRet < 0) {
-          status_ = -1;
-          error_ = "Failed to read from stream";
-          return read_size_;
-        }
-
-        if (readRet <= 0) { break; }
+  explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
+      : core::Processor(std::move(name), uuid) {
+  }
 
-        const auto err = produce(segment_num, buffer, readRet);
-        if (err) {
-          messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
-            auto& message = flow_file.messages.at(segment_num);
-            message.status = MessageStatus::MESSAGESTATUS_ERROR;
-            message.err_code = err;
-          });
-          status_ = -1;
-          error_ = rd_kafka_err2str(err);
-          return read_size_;
-        }
-        read_size_ += readRet;
-      }
-      return read_size_;
-    }
+  ~PublishKafka() override = default;
 
-    const uint64_t flow_size_ = 0;
-    const uint64_t max_seg_size_ = 0;
-    const std::string key_;
-    rd_kafka_topic_t * const rkt_ = nullptr;
-    rd_kafka_t * const rk_ = nullptr;
-    const rd_kafka_headers_unique_ptr hdrs;  // not null
-    const std::shared_ptr<Messages> messages_;
-    const size_t flow_file_index_;
-    int status_ = 0;
-    std::string error_;
-    int read_size_ = 0;
-    bool called_ = false;
-    const bool fail_empty_flow_files_ = true;
-  };
-
- public:
-  bool supportsDynamicProperties() override {
-    return true;
-  }
+  bool supportsDynamicProperties() override { return true; }
 
   /**
    * Function that's executed when the processor is scheduled.
@@ -348,25 +106,25 @@ class PublishKafka : public core::Processor {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void notifyStop() override;
 
+  class Messages;
+
  protected:
   bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context);
   bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
 
  private:
-  static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque);
-
-  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<PublishKafka>::getLogger()};
 
   KafkaConnectionKey key_;
   std::unique_ptr<KafkaConnection> conn_;
   std::mutex connection_mutex_;
 
-  uint32_t batch_size_;
-  uint64_t target_batch_payload_size_;
-  uint64_t max_flow_seg_size_;
+  uint32_t batch_size_{};
+  uint64_t target_batch_payload_size_{};
+  uint64_t max_flow_seg_size_{};
   utils::Regex attributeNameRegex_;
 
-  std::atomic<bool> interrupted_;
+  std::atomic<bool> interrupted_{false};
   std::mutex messages_mutex_;
   std::set<std::shared_ptr<Messages>> messages_set_;
 };
diff --git a/win_build_vs.bat b/win_build_vs.bat
index a2beefa..3a24778 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -64,6 +64,9 @@ for %%x in (%*) do (
     if [%%~x] EQU [/D] (
         set cmake_build_type=RelWithDebInfo
     )
+    if [%%~x] EQU [/DD] (
+        set cmake_build_type=Debug
+    )
     if [%%~x] EQU [/CI] (
         set "strict_gsl_checks=-DSTRICT_GSL_CHECKS=AUDIT"
     )