You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/26 08:40:44 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1033 - PublishKafka fixes

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new baac0a0  MINIFICPP-1033 - PublishKafka fixes
baac0a0 is described below

commit baac0a08af4db0f8169e68e4618f021716f41338
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Wed Sep 25 04:43:34 2019 +0200

    MINIFICPP-1033 - PublishKafka fixes
    
    Fixes:
     - We now use delivery completion callbacks to properly determine successful delivery
     - The 'Delivery Guarantee' property had an invalid default value
     - Most of the configuration errors from librdkafka were silently ignored
     - rd_kafka_conf_t and rd_kafka_topic_conf_t objects were improperly destructed
       after their ownership has been taken by rd_kafka_t and rd_kafka_topic_t, respectively
    
    MINIFICPP-1033 - SSL fixes
    
    Signed-off-by: Arpad Boda <ab...@apche.org>
    
    This closes #653
---
 NOTICE                                          |   5 +
 cmake/FindPatch.cmake                           |  69 +++
 extensions/librdkafka/CMakeLists.txt            |   9 +-
 extensions/librdkafka/KafkaConnection.cpp       |  21 +-
 extensions/librdkafka/KafkaConnection.h         |  31 +-
 extensions/librdkafka/KafkaTopic.h              |  14 +-
 extensions/librdkafka/PublishKafka.cpp          | 627 +++++++++++++++++-------
 extensions/librdkafka/PublishKafka.h            | 166 ++++++-
 thirdparty/librdkafka/librdkafka-libressl.patch | 148 ++++++
 9 files changed, 860 insertions(+), 230 deletions(-)

diff --git a/NOTICE b/NOTICE
index 252860b..7dcbc81 100644
--- a/NOTICE
+++ b/NOTICE
@@ -29,3 +29,8 @@ Copyright (c) 1996 - 2019, Daniel Stenberg, <da...@haxx.se>, and many contribut
 The derived work is adapted from
   CMake/FindLibSSH2.cmake
 and can be found in cmake/libssh2/sys/FindLibSSH2.cmake
+
+This includes derived works from the CMake (BSD 3-Clause licensed) project (https://github.com/Kitware/CMake):
+Copyright 2000-2019 Kitware, Inc. and Contributors
+The derived work is adapted from
+  Modules/FindPatch.cmake
diff --git a/cmake/FindPatch.cmake b/cmake/FindPatch.cmake
new file mode 100644
index 0000000..a81e0eb
--- /dev/null
+++ b/cmake/FindPatch.cmake
@@ -0,0 +1,69 @@
+# Distributed under the OSI-approved BSD 3-Clause License.  See accompanying
+# file Copyright.txt or https://cmake.org/licensing for details.
+
+#[=======================================================================[.rst:
+FindPatch
+---------
+
+The module defines the following variables:
+
+``Patch_EXECUTABLE``
+  Path to patch command-line executable.
+``Patch_FOUND``
+  True if the patch command-line executable was found.
+
+The following :prop_tgt:`IMPORTED` targets are also defined:
+
+``Patch::patch``
+  The command-line executable.
+
+Example usage:
+
+.. code-block:: cmake
+
+   find_package(Patch)
+   if(Patch_FOUND)
+     message("Patch found: ${Patch_EXECUTABLE}")
+   endif()
+#]=======================================================================]
+
+set(_doc "Patch command line executable")
+set(_patch_path )
+
+if(CMAKE_HOST_WIN32)
+  set(_patch_path
+    "$ENV{LOCALAPPDATA}/Programs/Git/bin"
+    "$ENV{LOCALAPPDATA}/Programs/Git/usr/bin"
+    "$ENV{APPDATA}/Programs/Git/bin"
+    "$ENV{APPDATA}/Programs/Git/usr/bin"
+    )
+endif()
+
+# First search the PATH
+find_program(Patch_EXECUTABLE
+  NAME patch
+  PATHS ${_patch_path}
+  DOC ${_doc}
+  )
+
+if(CMAKE_HOST_WIN32)
+  # Now look for installations in Git/ directories under typical installation
+  # prefixes on Windows.
+  find_program(Patch_EXECUTABLE
+    NAMES patch
+    PATH_SUFFIXES Git/usr/bin Git/bin GnuWin32/bin
+    DOC ${_doc}
+    )
+endif()
+
+if(Patch_EXECUTABLE AND NOT TARGET Patch::patch)
+  add_executable(Patch::patch IMPORTED)
+  set_property(TARGET Patch::patch PROPERTY IMPORTED_LOCATION ${Patch_EXECUTABLE})
+endif()
+
+unset(_patch_path)
+unset(_doc)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(Patch
+                                  REQUIRED_VARS Patch_EXECUTABLE)
diff --git a/extensions/librdkafka/CMakeLists.txt b/extensions/librdkafka/CMakeLists.txt
index 769b0e3..aa74bbf 100644
--- a/extensions/librdkafka/CMakeLists.txt
+++ b/extensions/librdkafka/CMakeLists.txt
@@ -43,16 +43,19 @@ if(WIN32 OR NOT USE_SYSTEM_ZLIB)
 endif()
 string(REPLACE ";" "%" CMAKE_MODULE_PATH_PASSTHROUGH "${CMAKE_MODULE_PATH_PASSTHROUGH_LIST}")
 
+find_package(Patch REQUIRED)
+
 ExternalProject_Add(
     kafka-external
-    GIT_REPOSITORY "https://github.com/edenhill/librdkafka.git"
-    GIT_TAG "v1.0.1" 
+    URL "https://github.com/edenhill/librdkafka/archive/v1.0.1.tar.gz"
+    URL_HASH "SHA256=b2a2defa77c0ef8c508739022a197886e0644bd7bf6179de1b68bdffb02b3550"
+    PATCH_COMMAND "${Patch_EXECUTABLE}" -p1 -i "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka/librdkafka-libressl.patch"
     PREFIX "${BASE_DIR}"
     LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
     CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
                "-DCMAKE_INSTALL_PREFIX=${BASE_DIR}/install"
                "-DWITH_SASL=OFF"
-               "-DOPENSSL_VERSION=1.0.2"
+               "-DWITH_SSL=ON"
                "-DRDKAFKA_BUILD_STATIC=ON"
                "-DRDKAFKA_BUILD_EXAMPLES=OFF"
                "-DRDKAFKA_BUILD_TESTS=OFF"
diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp
index 28b0b45..04b327d 100644
--- a/extensions/librdkafka/KafkaConnection.cpp
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -25,19 +25,24 @@ namespace processors {
 
 KafkaConnection::KafkaConnection(const KafkaConnectionKey &key)
     : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
-      conf_(nullptr),
-      kafka_connection_(nullptr) {
+      kafka_connection_(nullptr),
+      poll_(false) {
   lease_ = false;
   initialized_ = false;
   key_ = key;
 }
 
+KafkaConnection::~KafkaConnection() {
+  remove();
+}
+
 void KafkaConnection::remove() {
   topics_.clear();
   removeConnection();
 }
 
 void KafkaConnection::removeConnection() {
+  stopPoll();
   if (kafka_connection_) {
     rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
     rd_kafka_destroy(kafka_connection_);
@@ -46,10 +51,6 @@ void KafkaConnection::removeConnection() {
     });
     kafka_connection_ = nullptr;
   }
-  if (conf_) {
-    rd_kafka_conf_destroy(conf_);
-    conf_ = nullptr;
-  }
   initialized_ = false;
 }
 
@@ -57,18 +58,14 @@ bool KafkaConnection::initialized() const {
   return initialized_;
 }
 
-void KafkaConnection::setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) {
+void KafkaConnection::setConnection(rd_kafka_t *producer) {
   removeConnection();
   kafka_connection_ = producer;
-  conf_ = conf;
   initialized_ = true;
   modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
     loggers[producer] = logger_;
   });
-}
-
-rd_kafka_conf_t *KafkaConnection::getConf() const {
-  return conf_;
+  startPoll();
 }
 
 rd_kafka_t *KafkaConnection::getConnection() const {
diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h
index 6e158fe..774754b 100644
--- a/extensions/librdkafka/KafkaConnection.h
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -18,6 +18,7 @@
 #ifndef NIFI_MINIFI_CPP_KAFKACONNECTION_H
 #define NIFI_MINIFI_CPP_KAFKACONNECTION_H
 
+#include <atomic>
 #include <mutex>
 #include <string>
 #include "core/logging/LoggerConfiguration.h"
@@ -46,9 +47,7 @@ class KafkaConnection {
 
   explicit KafkaConnection(const KafkaConnectionKey &key);
 
-  ~KafkaConnection() {
-    remove();
-  }
+  ~KafkaConnection();
 
   void remove();
 
@@ -56,9 +55,7 @@ class KafkaConnection {
 
   bool initialized() const;
 
-  void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf);
-
-  rd_kafka_conf_t *getConf() const;
+  void setConnection(rd_kafka_t *producer);
 
   rd_kafka_t *getConnection() const;
 
@@ -90,9 +87,11 @@ class KafkaConnection {
 
   std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
 
-  rd_kafka_conf_t *conf_;
   rd_kafka_t *kafka_connection_;
 
+  std::atomic<bool> poll_;
+  std::thread thread_kafka_poll_;
+
   static void modifyLoggers(const std::function<void(std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
     static std::mutex loggers_mutex;
     static std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>> loggers;
@@ -100,6 +99,24 @@ class KafkaConnection {
     std::lock_guard<std::mutex> lock(loggers_mutex);
     func(loggers);
   }
+
+  void stopPoll() {
+    poll_ = false;
+    logger_->log_debug("Stop polling");
+    if (thread_kafka_poll_.joinable()) {
+      thread_kafka_poll_.join();
+    }
+  }
+
+  void startPoll() {
+    poll_ = true;
+    logger_->log_debug("Start polling");
+    thread_kafka_poll_ = std::thread([this]{
+        while (this->poll_) {
+          rd_kafka_poll(this->kafka_connection_, 1000);
+        }
+    });
+  }
 };
 
 class KafkaLease {
diff --git a/extensions/librdkafka/KafkaTopic.h b/extensions/librdkafka/KafkaTopic.h
index 30c34a0..f7b8f6e 100644
--- a/extensions/librdkafka/KafkaTopic.h
+++ b/extensions/librdkafka/KafkaTopic.h
@@ -28,9 +28,8 @@ namespace processors {
 
 class KafkaTopic {
  public:
-  KafkaTopic(rd_kafka_topic_t *topic_reference, rd_kafka_topic_conf_t *topic_conf)
-      : topic_conf_(topic_conf),
-        topic_reference_(topic_reference) {
+  KafkaTopic(rd_kafka_topic_t *topic_reference)
+      : topic_reference_(topic_reference) {
 
   }
 
@@ -38,21 +37,12 @@ class KafkaTopic {
     if (topic_reference_) {
       rd_kafka_topic_destroy(topic_reference_);
     }
-    if (topic_conf_) {
-      rd_kafka_topic_conf_destroy(topic_conf_);
-    }
-  }
-
-  rd_kafka_topic_conf_t *getTopicConf() const {
-    return topic_conf_;
   }
-
   rd_kafka_topic_t *getTopic() const {
     return topic_reference_;
   }
 
  private:
-  rd_kafka_topic_conf_t *topic_conf_;
   rd_kafka_topic_t *topic_reference_;
 };
 
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index b248c1e..75f8839 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -26,6 +26,7 @@
 #include <set>
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
@@ -36,37 +37,51 @@ namespace minifi {
 namespace processors {
 
 core::Property PublishKafka::SeedBrokers(
-    core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")->isRequired(true)->supportsExpressionLanguage(
-        true)->build());
+    core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
-core::Property PublishKafka::Topic(core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property PublishKafka::Topic(
+    core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
 core::Property PublishKafka::DeliveryGuarantee(
-    core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka")->isRequired(false)
-        ->supportsExpressionLanguage(true)->withDefaultValue("DELIVERY_ONE_NODE")->build());
+    core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka")
+        ->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
 
-core::Property PublishKafka::MaxMessageSize(core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")->isRequired(false)->build());
+core::Property PublishKafka::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")
+        ->isRequired(false)->build());
 
 core::Property PublishKafka::RequestTimeOut(
-    core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request in milliseconds")->isRequired(false)->withDefaultValue<core::TimePeriodValue>(
-        "10 sec")->supportsExpressionLanguage(true)->build());
+    core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request")
+        ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("10 sec")->supportsExpressionLanguage(true)->build());
+
+core::Property PublishKafka::MessageTimeOut(
+    core::PropertyBuilder::createProperty("Message Timeout")->withDescription("The total time sending a message could take")
+        ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("30 sec")->supportsExpressionLanguage(true)->build());
 
 core::Property PublishKafka::ClientName(
-    core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")->isRequired(true)->supportsExpressionLanguage(true)->build());
+    core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
 /**
  * These don't appear to need EL support
  */
 
 core::Property PublishKafka::BatchSize(
-    core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")->isRequired(false)->withDefaultValue<uint32_t>(10)->build());
-
+    core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")
+        ->isRequired(false)->withDefaultValue<uint32_t>(10)->build());
+core::Property PublishKafka::TargetBatchPayloadSize(
+    core::PropertyBuilder::createProperty("Target Batch Payload Size")->withDescription("The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).")
+        ->isRequired(false)->withDefaultValue<core::DataSizeValue>("512 KB")->build());
 core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
 core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", "");
 core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", "");
 core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", "");
 core::Property PublishKafka::CompressCodec("Compress Codec", "compression codec to use for compressing message sets", COMPRESSION_CODEC_NONE);
-core::Property PublishKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the kafka record", "");
+core::Property PublishKafka::MaxFlowSegSize(
+    core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.")
+        ->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build());
 core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
 core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", "");
 core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
@@ -81,6 +96,7 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+
 core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
 core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
 
@@ -92,9 +108,11 @@ void PublishKafka::initialize() {
   properties.insert(DeliveryGuarantee);
   properties.insert(MaxMessageSize);
   properties.insert(RequestTimeOut);
+  properties.insert(MessageTimeOut);
   properties.insert(ClientName);
   properties.insert(AttributeNameRegex);
   properties.insert(BatchSize);
+  properties.insert(TargetBatchPayloadSize);
   properties.insert(QueueBufferMaxTime);
   properties.insert(QueueBufferMaxSize);
   properties.insert(QueueBufferMaxMessage);
@@ -119,165 +137,220 @@ void PublishKafka::initialize() {
 }
 
 void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  interrupted_ = false;
+}
+
+void PublishKafka::notifyStop() {
+  logger_->log_debug("notifyStop called");
+  interrupted_ = true;
+  std::lock_guard<std::mutex> lock(messages_mutex_);
+  for (auto& messages : messages_set_) {
+    messages->interrupt();
+  }
 }
 
-bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &ff) {
+/**
+ * Message delivery report callback using the richer rd_kafka_message_t object.
+ */
+void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
+  if (rkmessage->_private == nullptr) {
+    return;
+  }
+  std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>* func =
+    reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
+  try {
+    (*func)(rk, rkmessage);
+  } catch (...) {
+  }
+  delete func;
+}
+
+bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context) {
   std::string value;
   int64_t valInt;
   std::string valueConf;
-  char errstr[512];
+  std::array<char, 512U> errstr;
   rd_kafka_conf_res_t result;
 
-  auto conf_ = rd_kafka_conf_new();
+  rd_kafka_conf_t* conf_ = rd_kafka_conf_new();
+  if (conf_ == nullptr) {
+    logger_->log_error("Failed to create rd_kafka_conf_t object");
+    return false;
+  }
+  utils::ScopeGuard confGuard([conf_](){
+    rd_kafka_conf_destroy(conf_);
+  });
 
   auto key = conn->getKey();
 
-  if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
-    rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr));
-    logger_->log_debug("PublishKafka: debug properties [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure debug properties error result [%s]", errstr);
-  }
-
-  if (!key->brokers_.empty()) {
-    result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr));
-    logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
-  } else {
+  if (key->brokers_.empty()) {
     logger_->log_error("There are no brokers");
     return false;
   }
+  result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
+  logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
+  if (result != RD_KAFKA_CONF_OK) {
+    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+    return false;
+  }
 
-  if (!key->client_id_.empty()) {
-    rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr));
-    logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
-  } else {
+  if (key->client_id_.empty()) {
     logger_->log_error("Client id is empty");
     return false;
   }
+  result = rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
+  logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
+  if (result != RD_KAFKA_CONF_OK) {
+    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+    return false;
+  }
 
-// Kerberos configuration
+  value = "";
+  if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
+    result = rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr.data(), errstr.size());
+    logger_->log_debug("PublishKafka: debug [%s]", value);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure debug error result [%s]", errstr.data());
+      return false;
+    }
+  }
+  value = "";
   if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+      return false;
+    }
   }
   value = "";
   if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+      return false;
+    }
   }
   value = "";
   if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+      return false;
+    }
   }
 
   value = "";
   if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     valueConf = std::to_string(valInt);
-    result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+      return false;
+    }
   }
   value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) {
-    rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
-    attributeNameRegex.assign(value);
-    logger_->log_debug("PublishKafka: AttributeNameRegex %s", value);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+      return false;
+    }
   }
   value = "";
   if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
     valInt = valInt / 1024;
     valueConf = std::to_string(valInt);
-    rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr);
-  }
-  value = "";
-  max_seg_size_ = ULLONG_MAX;
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    max_seg_size_ = valInt;
-    logger_->log_debug("PublishKafka: max flow segment size [%llu]", max_seg_size_);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+      return false;
+    }
   }
   value = "";
   if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) {
     core::TimeUnit unit;
     if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
       valueConf = std::to_string(valInt);
-      rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr, sizeof(errstr));
+      result = rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
       logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
-      if (result != RD_KAFKA_CONF_OK)
-        logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr);
+      if (result != RD_KAFKA_CONF_OK) {
+        logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr.data());
+        return false;
+      }
     }
   }
   value = "";
   if (context->getProperty(BatchSize.getName(), value) && !value.empty()) {
-    rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr.data());
+      return false;
+    }
   }
   value = "";
   if (context->getProperty(CompressCodec.getName(), value) && !value.empty() && value != "none") {
-    rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, sizeof(errstr));
+    result = rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: compression.codec [%s]", value);
-    if (result != RD_KAFKA_CONF_OK)
-      logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr.data());
+      return false;
+    }
   }
   value = "";
   if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
     if (value == SECURITY_PROTOCOL_SSL) {
-      rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr));
+      result = rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr.data(), errstr.size());
       logger_->log_debug("PublishKafka: security.protocol [%s]", value);
       if (result != RD_KAFKA_CONF_OK) {
-        logger_->log_error("PublishKafka: configure error result [%s]", errstr);
-      } else {
-        value = "";
-        if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, sizeof(errstr));
-          logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+        logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+        return false;
+      }
+      value = "";
+      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
+        result = rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr.data(), errstr.size());
+        logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
+        if (result != RD_KAFKA_CONF_OK) {
+          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+          return false;
         }
-        value = "";
-        if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr, sizeof(errstr));
-          logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+      }
+      value = "";
+      if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
+        result = rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr.data(), errstr.size());
+        logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
+        if (result != RD_KAFKA_CONF_OK) {
+          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+          return false;
         }
-        value = "";
-        if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, sizeof(errstr));
-          logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+      }
+      value = "";
+      if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
+        result = rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr.data(), errstr.size());
+        logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
+        if (result != RD_KAFKA_CONF_OK) {
+          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+          return false;
         }
-        value = "";
-        if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
-          rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, sizeof(errstr));
-          logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+      }
+      value = "";
+      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
+        result = rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr.data(), errstr.size());
+        logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
+        if (result != RD_KAFKA_CONF_OK) {
+          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+          return false;
         }
       }
+    } else {
+      logger_->log_error("PublishKafka: unknown Security Protocol: %s", value);
+      return false;
     }
   }
 
@@ -289,122 +362,320 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     value = "";
     if (context->getDynamicProperty(key, value) && !value.empty()) {
       logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", key, value);
-      rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr, sizeof(errstr));
+      result = rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr.data(), errstr.size());
+      if (result != RD_KAFKA_CONF_OK) {
+        logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+        return false;
+      }
     } else {
       logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", key);
     }
   }
 
+  // Set the delivery callback
+  rd_kafka_conf_set_dr_msg_cb(conf_, &PublishKafka::messageDeliveryCallback);
+
   // Set the logger callback
-  rd_kafka_conf_set_log_cb(conf_, KafkaConnection::logCallback);
+  rd_kafka_conf_set_log_cb(conf_, &KafkaConnection::logCallback);
+
+  rd_kafka_t* producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr.data(), errstr.size());
+
+  if (producer == nullptr) {
+    logger_->log_error("Failed to create Kafka producer %s", errstr.data());
+    return false;
+  }
+
+  // The producer took ownership of the configuration, we must not free it
+  confGuard.disable();
+
+  conn->setConnection(producer);
+
+  return true;
+}
+
+bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) {
+  rd_kafka_topic_conf_t* topic_conf_ = rd_kafka_topic_conf_new();
+  if (topic_conf_ == nullptr) {
+    logger_->log_error("Failed to create rd_kafka_topic_conf_t object");
+    return false;
+  }
+  utils::ScopeGuard confGuard([topic_conf_](){
+    rd_kafka_topic_conf_destroy(topic_conf_);
+  });
+
+  rd_kafka_conf_res_t result;
+  std::string value;
+  std::array<char, 512U> errstr;
+  int64_t valInt;
+  std::string valueConf;
 
-  auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr));
+  value = "";
+  if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) {
+    result = rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr.data(), errstr.size());
+    logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
+    if (result != RD_KAFKA_CONF_OK) {
+      logger_->log_error("PublishKafka: configure request.required.acks error result [%s]", errstr.data());
+      return false;
+    }
+  }
+  value = "";
+  if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, valInt, unit) &&
+        core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+      valueConf = std::to_string(valInt);
+      result = rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
+      logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
+      if (result != RD_KAFKA_CONF_OK) {
+        logger_->log_error("PublishKafka: configure request.timeout.ms error result [%s]", errstr.data());
+        return false;
+      }
+    }
+  }
+  value = "";
+  if (context->getProperty(MessageTimeOut.getName(), value) && !value.empty()) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, valInt, unit) &&
+        core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+      valueConf = std::to_string(valInt);
+      result = rd_kafka_topic_conf_set(topic_conf_, "message.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
+      logger_->log_debug("PublishKafka: message.timeout.ms [%s]", valueConf);
+      if (result != RD_KAFKA_CONF_OK) {
+        logger_->log_error("PublishKafka: configure message.timeout.ms error result [%s]", errstr.data());
+        return false;
+      }
+    }
+  }
 
-  if (!producer) {
-    logger_->log_error("Failed to create Kafka producer %s", errstr);
+  rd_kafka_topic_t* topic_reference = rd_kafka_topic_new(conn->getConnection(), topic_name.c_str(), topic_conf_);
+  if (topic_reference == nullptr) {
+    rd_kafka_resp_err_t resp_err = rd_kafka_last_error();
+    logger_->log_error("PublishKafka: failed to create topic %s, error: %s", topic_name.c_str(), rd_kafka_err2str(resp_err));
     return false;
   }
 
-  conn->setConnection(producer, conf_);
+  // The topic took ownership of the configuration, we must not free it
+  confGuard.disable();
+
+  std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference);
+
+  conn->putTopic(topic_name, kafkaTopicref);
 
   return true;
 }
 
 void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  logger_->log_trace("Enter trigger");
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
+  // Check whether we have been interrupted
+  if (interrupted_) {
+    logger_->log_info("The processor has been interrupted, not running onTrigger");
+    context->yield();
+    return;
+  }
 
-  if (!flowFile) {
+  // Try to get a KafkaConnection
+  std::string client_id, brokers;
+  if (!context->getProperty(ClientName.getName(), client_id)) {
+    logger_->log_error("Client Name property missing or invalid");
+    context->yield();
+    return;
+  }
+  if (!context->getProperty(SeedBrokers.getName(), brokers)) {
+    logger_->log_error("Knowb Brokers property missing or invalid");
+    context->yield();
     return;
   }
 
-  std::string client_id, brokers, topic;
+  KafkaConnectionKey key;
+  key.brokers_ = brokers;
+  key.client_id_ = client_id;
 
-  std::unique_ptr<KafkaLease> lease;
-  std::shared_ptr<KafkaConnection> conn;
-// get the client ID, brokers, and topic from either the flowfile, the configuration, or the properties
-  if (context->getProperty(ClientName, client_id, flowFile) && context->getProperty(SeedBrokers, brokers, flowFile) && context->getProperty(Topic, topic, flowFile)) {
-    KafkaConnectionKey key;
-    key.brokers_ = brokers;
-    key.client_id_ = client_id;
+  std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key);
+  if (lease == nullptr) {
+    logger_->log_info("This connection is used by another thread.");
+    context->yield();
+    return;
+  }
 
-    lease = connection_pool_.getOrCreateConnection(key);
-    if (lease == nullptr) {
-      logger_->log_info("This connection is used by another thread.");
+  std::shared_ptr<KafkaConnection> conn = lease->getConn();
+  if (!conn->initialized()) {
+    logger_->log_trace("Connection not initialized to %s, %s", client_id, brokers);
+    if (!configureNewConnection(conn, context)) {
+      logger_->log_error("Could not configure Kafka Connection");
       context->yield();
       return;
     }
-    conn = lease->getConn();
-
-    if (!conn->initialized()) {
-      logger_->log_trace("Connection not initialized to %s, %s, %s", client_id, brokers, topic);
-      if (!configureNewConnection(conn, context, flowFile)) {
-        logger_->log_error("Could not configure Kafka Connection");
-        session->transfer(flowFile, Failure);
-        return;
-      }
-    }
+  }
 
-    if (!conn->hasTopic(topic)) {
-      auto topic_conf_ = rd_kafka_topic_conf_new();
-      auto topic_reference = rd_kafka_topic_new(conn->getConnection(), topic.c_str(), topic_conf_);
-      rd_kafka_conf_res_t result;
-      std::string value;
-      char errstr[512];
-      int64_t valInt;
-      std::string valueConf;
-
-      if (context->getProperty(DeliveryGuarantee, value, flowFile) && !value.empty()) {
-        rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr, sizeof(errstr));
-        logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
-        if (result != RD_KAFKA_CONF_OK)
-          logger_->log_error("PublishKafka: configure delivery guarantee error result [%s]", errstr);
-      }
-      value = "";
-      if (context->getProperty(RequestTimeOut, value, flowFile) && !value.empty()) {
-        core::TimeUnit unit;
-        if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
-          valueConf = std::to_string(valInt);
-          rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr, sizeof(errstr));
-          logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
-          if (result != RD_KAFKA_CONF_OK)
-            logger_->log_error("PublishKafka: configure timeout error result [%s]", errstr);
-        }
-      }
+  // Get some properties not (only) used directly to set up librdkafka
+  std::string value;
 
-      std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference, topic_conf_);
+  // Batch Size
+  uint32_t batch_size;
+  value = "";
+  if (context->getProperty(BatchSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, batch_size)) {
+    logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size);
+  } else {
+    batch_size = 10;
+  }
 
-      conn->putTopic(topic, kafkaTopicref);
-    }
+  // Target Batch Payload Size
+  uint64_t target_batch_payload_size;
+  value = "";
+  if (context->getProperty(TargetBatchPayloadSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, target_batch_payload_size)) {
+    logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size);
   } else {
-    logger_->log_error("Do not have required properties");
-    session->transfer(flowFile, Failure);
-    return;
+    target_batch_payload_size = 512 * 1024U;
   }
 
-  std::string kafkaKey;
-  kafkaKey = "";
-  if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
-    logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
+  // Max Flow Segment Size
+  uint64_t max_flow_seg_size;
+  value = "";
+  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, max_flow_seg_size)) {
+    logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size);
   } else {
-    kafkaKey = flowFile->getUUIDStr();
+    max_flow_seg_size = 0U;
+  }
+
+  // Attributes to Send as Headers
+  utils::Regex attributeNameRegex;
+  value = "";
+  if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
+    attributeNameRegex = utils::Regex(value);
+    logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
+  }
+
+  // Collect FlowFiles to process
+  uint64_t actual_bytes = 0U;
+  std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
+  for (uint32_t i = 0; i < batch_size; i++) {
+    std::shared_ptr<core::FlowFile> flowFile = session->get();
+    if (flowFile == nullptr) {
+      break;
+    }
+    actual_bytes += flowFile->getSize();
+    flowFiles.emplace_back(std::move(flowFile));
+    if (target_batch_payload_size != 0U && actual_bytes >= target_batch_payload_size) {
+      break;
+    }
+  }
+  if (flowFiles.empty()) {
+    context->yield();
+    return;
   }
+  logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);
+
+  auto messages = std::make_shared<Messages>();
+  // We must add this to the messages set, so that it will be interrupted when notifyStop is called
+  {
+    std::lock_guard<std::mutex> lock(messages_mutex_);
+    messages_set_.emplace(messages);
+  }
+  // We also have to insure that it will be removed once we are done with it
+  utils::ScopeGuard messagesSetGuard([&]() {
+    std::lock_guard<std::mutex> lock(messages_mutex_);
+    messages_set_.erase(messages);
+  });
+
+  // Process FlowFiles
+  for (auto& flowFile : flowFiles) {
+    size_t flow_file_index = messages->addFlowFile();
+
+    // Get Topic (FlowFile-dependent EL property)
+    std::string topic;
+    if (!context->getProperty(Topic, topic, flowFile)) {
+      logger_->log_error("Flow file %s does not have a valid Topic", flowFile->getUUIDStr());
+      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+        flow_file_result.flow_file_error = true;
+      });
+      continue;
+    }
+
+    // Add topic to the connection if needed
+    if (!conn->hasTopic(topic)) {
+      if (!createNewTopic(conn, context, topic)) {
+        logger_->log_error("Failed to add topic %s", topic);
+        messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+          flow_file_result.flow_file_error = true;
+        });
+        continue;
+      }
+    }
 
-  auto thisTopic = conn->getTopic(topic);
-  if (thisTopic) {
-    PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile, attributeNameRegex);
+    std::string kafkaKey;
+    kafkaKey = "";
+    if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
+      logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
+    } else {
+      kafkaKey = flowFile->getUUIDStr();
+    }
+
+    auto thisTopic = conn->getTopic(topic);
+    if (thisTopic == nullptr) {
+      logger_->log_error("Topic %s is invalid", topic);
+      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+        flow_file_result.flow_file_error = true;
+      });
+      continue;
+    }
+
+    PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
+                                        attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
     if (callback.status_ < 0) {
-      logger_->log_error("Failed to send flow to kafka topic %s", topic);
-      session->transfer(flowFile, Failure);
-    } else {
-      logger_->log_debug("Sent flow with length %d to kafka topic %s", callback.read_size_, topic);
-      session->transfer(flowFile, Success);
+      logger_->log_error("Failed to send flow to kafka topic %s, error: %s", topic, callback.error_);
+      messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+        flow_file_result.flow_file_error = true;
+      });
+      continue;
     }
-  } else {
-    logger_->log_error("Topic %s is invalid", topic);
-    session->transfer(flowFile, Failure);
   }
+
+  logger_->log_trace("PublishKafka::onTrigger waitForCompletion start");
+  messages->waitForCompletion();
+  if (messages->wasInterrupted()) {
+    logger_->log_warn("Waiting for delivery confirmation was interrupted, some flow files might be routed to Failure, even if they were successfully delivered.");
+  }
+  logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish");
+
+  messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) {
+    bool success;
+    if (flow_file.flow_file_error) {
+      success = false;
+    } else if (flow_file.messages.empty()) {
+      success = false;
+      logger_->log_error("Assertion error: no messages found for flow file %s", flowFiles[index]->getUUIDStr());
+    } else {
+      success = true;
+      for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
+        const auto& message = flow_file.messages[segment_num];
+        switch (message.status) {
+          case MessageStatus::MESSAGESTATUS_UNCOMPLETE:
+            success = false;
+            logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
+                flowFiles[index]->getUUIDStr(),
+                segment_num);
+          break;
+          case MessageStatus::MESSAGESTATUS_ERROR:
+            success = false;
+            logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
+                flowFiles[index]->getUUIDStr(),
+                segment_num,
+                rd_kafka_err2str(message.err_code));
+          break;
+          case MessageStatus::MESSAGESTATUS_SUCCESS:
+            logger_->log_debug("Successfully delivered flow file %s segment %zu",
+                flowFiles[index]->getUUIDStr(),
+                segment_num);
+          break;
+        }
+      }
+    }
+    if (success) {
+      session->transfer(flowFiles[index], Success);
+    } else {
+      session->transfer(flowFiles[index], Failure);
+    }
+  });
 }
 
 } /* namespace processors */
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 4d0a759..2881f49 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -28,9 +28,15 @@
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/logging/Logger.h"
+#include "utils/RegexUtils.h"
 #include "rdkafka.h"
 #include "KafkaPool.h"
-#include <regex>
+#include <atomic>
+#include <map>
+#include <set>
+#include <mutex>
+#include <cstdint>
+#include <condition_variable>
 
 namespace org {
 namespace apache {
@@ -63,8 +69,8 @@ class PublishKafka : public core::Processor {
   explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
       : core::Processor(name, uuid),
         connection_pool_(5),
-        logger_(logging::LoggerFactory<PublishKafka>::getLogger()) {
-    max_seg_size_ = -1;
+        logger_(logging::LoggerFactory<PublishKafka>::getLogger()),
+        interrupted_(false) {
   }
   // Destructor
   virtual ~PublishKafka() {
@@ -77,8 +83,10 @@ class PublishKafka : public core::Processor {
   static core::Property DeliveryGuarantee;
   static core::Property MaxMessageSize;
   static core::Property RequestTimeOut;
+  static core::Property MessageTimeOut;
   static core::Property ClientName;
   static core::Property BatchSize;
+  static core::Property TargetBatchPayloadSize;
   static core::Property AttributeNameRegex;
   static core::Property QueueBufferMaxTime;
   static core::Property QueueBufferMaxSize;
@@ -100,29 +108,124 @@ class PublishKafka : public core::Processor {
   static core::Relationship Failure;
   static core::Relationship Success;
 
+  // Message
+  enum class MessageStatus : uint8_t {
+    MESSAGESTATUS_UNCOMPLETE,
+    MESSAGESTATUS_ERROR,
+    MESSAGESTATUS_SUCCESS
+  };
+
+  struct MessageResult {
+    MessageStatus status;
+    rd_kafka_resp_err_t err_code;
+
+    MessageResult()
+        : status(MessageStatus::MESSAGESTATUS_UNCOMPLETE) {
+    }
+  };
+  struct FlowFileResult {
+    bool flow_file_error;
+    std::vector<MessageResult> messages;
+
+    FlowFileResult()
+        : flow_file_error(false) {
+    }
+  };
+  struct Messages {
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::vector<FlowFileResult> flow_files;
+    bool interrupted;
+
+    Messages()
+        : interrupted(false) {
+    }
+
+    void waitForCompletion() {
+      std::unique_lock<std::mutex> lock(mutex);
+      cv.wait(lock, [this]() -> bool {
+        if (interrupted) {
+          return true;
+        }
+        size_t index = 0U;
+        return std::all_of(this->flow_files.begin(), this->flow_files.end(), [&](const FlowFileResult& flow_file) {
+          index++;
+          if (flow_file.flow_file_error) {
+            return true;
+          }
+          return std::all_of(flow_file.messages.begin(), flow_file.messages.end(), [](const MessageResult& message) {
+            return message.status != MessageStatus::MESSAGESTATUS_UNCOMPLETE;
+          });
+        });
+      });
+    }
+
+    void modifyResult(size_t index, const std::function<void(FlowFileResult&)>& fun) {
+      std::unique_lock<std::mutex> lock(mutex);
+      fun(flow_files.at(index));
+      cv.notify_all();
+    }
+
+    size_t addFlowFile() {
+      std::lock_guard<std::mutex> lock(mutex);
+      flow_files.emplace_back();
+      return flow_files.size() - 1;
+    }
+
+    void iterateFlowFiles(const std::function<void(size_t /*index*/, const FlowFileResult& /*flow_file_result*/)>& fun) {
+      std::lock_guard<std::mutex> lock(mutex);
+      for (size_t index = 0U; index < flow_files.size(); index++) {
+        fun(index, flow_files[index]);
+      }
+    }
+
+    void interrupt() {
+      std::unique_lock<std::mutex> lock(mutex);
+      interrupted = true;
+      cv.notify_all();
+    }
+
+    bool wasInterrupted() {
+      std::lock_guard<std::mutex> lock(mutex);
+      return interrupted;
+    }
+  };
+
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt, rd_kafka_t *rk, const std::shared_ptr<core::FlowFile> flowFile, const std::regex &attributeNameRegex)
+    ReadCallback(uint64_t max_seg_size,
+                 const std::string &key,
+                 rd_kafka_topic_t *rkt,
+                 rd_kafka_t *rk,
+                 const std::shared_ptr<core::FlowFile> flowFile,
+                 utils::Regex &attributeNameRegex,
+                 std::shared_ptr<Messages> messages,
+                 size_t flow_file_index)
         : max_seg_size_(max_seg_size),
           key_(key),
           rkt_(rkt),
           rk_(rk),
           flowFile_(flowFile),
+          messages_(std::move(messages)),
+          flow_file_index_(flow_file_index),
           attributeNameRegex_(attributeNameRegex) {
       flow_size_ = flowFile_->getSize();
       status_ = 0;
       read_size_ = 0;
       hdrs = nullptr;
     }
+
     ~ReadCallback() {
       if (hdrs) {
         rd_kafka_headers_destroy(hdrs);
       }
     }
+
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (flow_size_ < max_seg_size_)
+      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
         max_seg_size_ = flow_size_;
+      }
       std::vector<unsigned char> buffer;
       buffer.reserve(max_seg_size_);
       read_size_ = 0;
@@ -130,7 +233,7 @@ class PublishKafka : public core::Processor {
       rd_kafka_resp_err_t err;
 
       for (auto kv : flowFile_->getAttributes()) {
-        if (regex_match(kv.first, attributeNameRegex_)) {
+        if(attributeNameRegex_.match(kv.first)) {
           if (!hdrs) {
             hdrs = rd_kafka_headers_new(8);
           }
@@ -138,36 +241,60 @@ class PublishKafka : public core::Processor {
         }
       }
 
+      size_t segment_num = 0U;
       while (read_size_ < flow_size_) {
         int readRet = stream->read(&buffer[0], max_seg_size_);
         if (readRet < 0) {
           status_ = -1;
+          error_ = "Failed to read from stream";
           return read_size_;
         }
         if (readRet > 0) {
+          messages_->modifyResult(flow_file_index_, [](FlowFileResult& flow_file) {
+            flow_file.messages.resize(flow_file.messages.size() + 1);
+          });
+          auto messages_copy = this->messages_;
+          auto flow_file_index_copy = this->flow_file_index_;
+          auto callback = std::unique_ptr<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(
+              new std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>(
+                [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t* /*rk*/, const rd_kafka_message_t* rkmessage) {
+                  messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult& flow_file) {
+                    auto& message = flow_file.messages.at(segment_num);
+                    message.err_code = rkmessage->err;
+                    message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+                  });
+                }));
           if (hdrs) {
             rd_kafka_headers_t *hdrs_copy;
             hdrs_copy = rd_kafka_headers_copy(hdrs);
             err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet),
-                                    RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END);
+                                    RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback.release()), RD_KAFKA_V_END);
             if (err) {
               rd_kafka_headers_destroy(hdrs_copy);
             }
           } else {
             err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet),
-                                    RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END);
+                                    RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback.release()), RD_KAFKA_V_END);
           }
           if (err) {
+            messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
+              auto& message = flow_file.messages.at(segment_num);
+              message.status = MessageStatus::MESSAGESTATUS_ERROR;
+              message.err_code = err;
+            });
             status_ = -1;
+            error_ = rd_kafka_err2str(err);
             return read_size_;
           }
           read_size_ += readRet;
         } else {
           break;
         }
+        segment_num++;
       }
       return read_size_;
     }
+
     uint64_t flow_size_;
     uint64_t max_seg_size_;
     std::string key_;
@@ -175,14 +302,17 @@ class PublishKafka : public core::Processor {
     rd_kafka_t *rk_;
     rd_kafka_headers_t *hdrs;
     std::shared_ptr<core::FlowFile> flowFile_;
+    std::shared_ptr<Messages> messages_;
+    size_t flow_file_index_;
     int status_;
+    std::string error_;
     int read_size_;
-    std::regex attributeNameRegex_;
+    utils::Regex& attributeNameRegex_;
   };
 
  public:
 
-  virtual bool supportsDynamicProperties() {
+  virtual bool supportsDynamicProperties() override {
     return true;
   }
 
@@ -195,23 +325,23 @@ class PublishKafka : public core::Processor {
   virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   virtual void initialize() override;
   virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  virtual void notifyStop() override;
 
  protected:
 
-  bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &ff);
+  bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context);
+  bool createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
 
  private:
+  static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque);
+
   std::shared_ptr<logging::Logger> logger_;
 
   KafkaPool connection_pool_;
 
-//  rd_kafka_conf_t *conf_;
-  //rd_kafka_t *rk_;
-  //1rd_kafka_topic_conf_t *topic_conf_;
-  //rd_kafka_topic_t *rkt_;
-  //std::string topic_;
-  uint64_t max_seg_size_;
-  std::regex attributeNameRegex;
+  std::atomic<bool> interrupted_;
+  std::mutex messages_mutex_;
+  std::set<std::shared_ptr<Messages>> messages_set_;
 };
 
 REGISTER_RESOURCE(PublishKafka, "This Processor puts the contents of a FlowFile to a Topic in Apache Kafka. The content of a FlowFile becomes the contents of a Kafka message. "
diff --git a/thirdparty/librdkafka/librdkafka-libressl.patch b/thirdparty/librdkafka/librdkafka-libressl.patch
new file mode 100644
index 0000000..c99a9d9
--- /dev/null
+++ b/thirdparty/librdkafka/librdkafka-libressl.patch
@@ -0,0 +1,148 @@
+diff --git a/src/rdkafka_transport.c b/src/rdkafka_transport.c
+index 9c7fc36c..de083109 100644
+--- a/src/rdkafka_transport.c
++++ b/src/rdkafka_transport.c
+@@ -3,24 +3,24 @@
+  *
+  * Copyright (c) 2015, Magnus Edenhill
+  * All rights reserved.
+- * 
++ *
+  * Redistribution and use in source and binary forms, with or without
+- * modification, are permitted provided that the following conditions are met: 
+- * 
++ * modification, are permitted provided that the following conditions are met:
++ *
+  * 1. Redistributions of source code must retain the above copyright notice,
+- *    this list of conditions and the following disclaimer. 
++ *    this list of conditions and the following disclaimer.
+  * 2. Redistributions in binary form must reproduce the above copyright notice,
+  *    this list of conditions and the following disclaimer in the documentation
+- *    and/or other materials provided with the distribution. 
+- * 
++ *    and/or other materials provided with the distribution.
++ *
+  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
++ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
++ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
++ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
++ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
++ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
++ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
++ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+  * POSSIBILITY OF SUCH DAMAGE.
+@@ -433,7 +433,7 @@ static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
+ 		else
+ 			rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr);
+ 	}
+-	
++
+ 	ERR_error_string_n(l, buf, sizeof(buf));
+ 
+ 	rd_snprintf(errstr, errstr_size, "%s:%d: %s: %s",
+@@ -443,7 +443,7 @@ static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
+ 
+     if (cnt == 0)
+     	    rd_snprintf(errstr, errstr_size, "No error");
+-    
++
+     return errstr;
+ }
+ 
+@@ -507,7 +507,7 @@ void rd_kafka_transport_ssl_term (void) {
+ void rd_kafka_transport_ssl_init (void) {
+ #if OPENSSL_VERSION_NUMBER < 0x10100000L
+ 	int i;
+-	
++
+ 	if (!CRYPTO_get_locking_callback()) {
+ 		rd_kafka_ssl_locks_cnt = CRYPTO_num_locks();
+ 		rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt *
+@@ -736,11 +736,11 @@ static int rd_kafka_transport_ssl_connect (rd_kafka_broker_t *rkb,
+ 		return 0;
+ 	}
+ 
+-		
++
+ 	if (rd_kafka_transport_ssl_io_update(rktrans, r,
+ 					     errstr, errstr_size) == -1)
+ 		return -1;
+-	
++
+ 	return 0;
+ 
+  fail:
+@@ -863,17 +863,6 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
+ 	int r;
+ 	SSL_CTX *ctx;
+ 
+-#if OPENSSL_VERSION_NUMBER >= 0x10100000
+-        rd_kafka_dbg(rk, SECURITY, "OPENSSL", "Using OpenSSL version %s "
+-                     "(0x%lx, librdkafka built with 0x%lx)",
+-                     OpenSSL_version(OPENSSL_VERSION),
+-                     OpenSSL_version_num(),
+-                     OPENSSL_VERSION_NUMBER);
+-#else
+-        rd_kafka_dbg(rk, SECURITY, "OPENSSL", "librdkafka built with OpenSSL "
+-                     "version 0x%lx", OPENSSL_VERSION_NUMBER);
+-#endif
+-
+         if (errstr_size > 0)
+                 errstr[0] = '\0';
+ 
+@@ -945,7 +934,7 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
+ 			     "Loading CA certificate(s) from %s %s",
+ 			     is_dir ? "directory":"file",
+ 			     rk->rk_conf.ssl.ca_location);
+-		
++
+ 		r = SSL_CTX_load_verify_locations(ctx,
+ 						  !is_dir ?
+ 						  rk->rk_conf.ssl.
+@@ -1034,8 +1023,8 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
+ 
+ 		if (!(fp = fopen(rk->rk_conf.ssl.keystore_location, "rb"))) {
+ 			rd_snprintf(errstr, errstr_size,
+-				    "Failed to open ssl.keystore.location: %s: %s", 
+-				    rk->rk_conf.ssl.keystore_location, 
++				    "Failed to open ssl.keystore.location: %s: %s",
++				    rk->rk_conf.ssl.keystore_location,
+ 				    rd_strerror(errno));
+ 			goto fail;
+ 		}
+@@ -1495,7 +1484,7 @@ static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans,
+ /**
+  * Poll and serve IOs
+  *
+- * Locality: broker thread 
++ * Locality: broker thread
+  */
+ void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
+                                   int timeout_ms) {
+@@ -1545,7 +1534,7 @@ rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
+ 
+ #ifdef SO_NOSIGPIPE
+ 	/* Disable SIGPIPE signalling for this socket on OSX */
+-	if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) 
++	if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1)
+ 		rd_rkb_dbg(rkb, BROKER, "SOCKET",
+ 			   "Failed to set SO_NOSIGPIPE: %s",
+ 			   socket_strerror(socket_errno));
+@@ -1709,7 +1698,7 @@ void rd_kafka_transport_term (void) {
+ #endif
+ }
+ #endif
+- 
++
+ void rd_kafka_transport_init(void) {
+ #ifdef _MSC_VER
+ 	WSADATA d;