You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/26 08:40:44 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1033 -
PublishKafka fixes
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new baac0a0 MINIFICPP-1033 - PublishKafka fixes
baac0a0 is described below
commit baac0a08af4db0f8169e68e4618f021716f41338
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Wed Sep 25 04:43:34 2019 +0200
MINIFICPP-1033 - PublishKafka fixes
Fixes:
- We now use delivery completion callbacks to properly determine successful delivery
- The 'Delivery Guarantee' property had an invalid default value
- Most of the configuration errors from librdkafka were silently ignored
- rd_kafka_conf_t and rd_kafka_topic_conf_t objects were improperly destructed
after their ownership has been taken by rd_kafka_t and rd_kafka_topic_t, respectively
MINIFICPP-1033 - SSL fixes
Signed-off-by: Arpad Boda <ab...@apche.org>
This closes #653
---
NOTICE | 5 +
cmake/FindPatch.cmake | 69 +++
extensions/librdkafka/CMakeLists.txt | 9 +-
extensions/librdkafka/KafkaConnection.cpp | 21 +-
extensions/librdkafka/KafkaConnection.h | 31 +-
extensions/librdkafka/KafkaTopic.h | 14 +-
extensions/librdkafka/PublishKafka.cpp | 627 +++++++++++++++++-------
extensions/librdkafka/PublishKafka.h | 166 ++++++-
thirdparty/librdkafka/librdkafka-libressl.patch | 148 ++++++
9 files changed, 860 insertions(+), 230 deletions(-)
diff --git a/NOTICE b/NOTICE
index 252860b..7dcbc81 100644
--- a/NOTICE
+++ b/NOTICE
@@ -29,3 +29,8 @@ Copyright (c) 1996 - 2019, Daniel Stenberg, <da...@haxx.se>, and many contribut
The derived work is adapted from
CMake/FindLibSSH2.cmake
and can be found in cmake/libssh2/sys/FindLibSSH2.cmake
+
+This includes derived works from the CMake (BSD 3-Clause licensed) project (https://github.com/Kitware/CMake):
+Copyright 2000-2019 Kitware, Inc. and Contributors
+The derived work is adapted from
+ Modules/FindPatch.cmake
diff --git a/cmake/FindPatch.cmake b/cmake/FindPatch.cmake
new file mode 100644
index 0000000..a81e0eb
--- /dev/null
+++ b/cmake/FindPatch.cmake
@@ -0,0 +1,69 @@
+# Distributed under the OSI-approved BSD 3-Clause License. See accompanying
+# file Copyright.txt or https://cmake.org/licensing for details.
+
+#[=======================================================================[.rst:
+FindPatch
+---------
+
+The module defines the following variables:
+
+``Patch_EXECUTABLE``
+ Path to patch command-line executable.
+``Patch_FOUND``
+ True if the patch command-line executable was found.
+
+The following :prop_tgt:`IMPORTED` targets are also defined:
+
+``Patch::patch``
+ The command-line executable.
+
+Example usage:
+
+.. code-block:: cmake
+
+ find_package(Patch)
+ if(Patch_FOUND)
+ message("Patch found: ${Patch_EXECUTABLE}")
+ endif()
+#]=======================================================================]
+
+set(_doc "Patch command line executable")
+set(_patch_path )
+
+if(CMAKE_HOST_WIN32)
+ set(_patch_path
+ "$ENV{LOCALAPPDATA}/Programs/Git/bin"
+ "$ENV{LOCALAPPDATA}/Programs/Git/usr/bin"
+ "$ENV{APPDATA}/Programs/Git/bin"
+ "$ENV{APPDATA}/Programs/Git/usr/bin"
+ )
+endif()
+
+# First search the PATH
+find_program(Patch_EXECUTABLE
+ NAME patch
+ PATHS ${_patch_path}
+ DOC ${_doc}
+ )
+
+if(CMAKE_HOST_WIN32)
+ # Now look for installations in Git/ directories under typical installation
+ # prefixes on Windows.
+ find_program(Patch_EXECUTABLE
+ NAMES patch
+ PATH_SUFFIXES Git/usr/bin Git/bin GnuWin32/bin
+ DOC ${_doc}
+ )
+endif()
+
+if(Patch_EXECUTABLE AND NOT TARGET Patch::patch)
+ add_executable(Patch::patch IMPORTED)
+ set_property(TARGET Patch::patch PROPERTY IMPORTED_LOCATION ${Patch_EXECUTABLE})
+endif()
+
+unset(_patch_path)
+unset(_doc)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(Patch
+ REQUIRED_VARS Patch_EXECUTABLE)
diff --git a/extensions/librdkafka/CMakeLists.txt b/extensions/librdkafka/CMakeLists.txt
index 769b0e3..aa74bbf 100644
--- a/extensions/librdkafka/CMakeLists.txt
+++ b/extensions/librdkafka/CMakeLists.txt
@@ -43,16 +43,19 @@ if(WIN32 OR NOT USE_SYSTEM_ZLIB)
endif()
string(REPLACE ";" "%" CMAKE_MODULE_PATH_PASSTHROUGH "${CMAKE_MODULE_PATH_PASSTHROUGH_LIST}")
+find_package(Patch REQUIRED)
+
ExternalProject_Add(
kafka-external
- GIT_REPOSITORY "https://github.com/edenhill/librdkafka.git"
- GIT_TAG "v1.0.1"
+ URL "https://github.com/edenhill/librdkafka/archive/v1.0.1.tar.gz"
+ URL_HASH "SHA256=b2a2defa77c0ef8c508739022a197886e0644bd7bf6179de1b68bdffb02b3550"
+ PATCH_COMMAND "${Patch_EXECUTABLE}" -p1 -i "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka/librdkafka-libressl.patch"
PREFIX "${BASE_DIR}"
LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
"-DCMAKE_INSTALL_PREFIX=${BASE_DIR}/install"
"-DWITH_SASL=OFF"
- "-DOPENSSL_VERSION=1.0.2"
+ "-DWITH_SSL=ON"
"-DRDKAFKA_BUILD_STATIC=ON"
"-DRDKAFKA_BUILD_EXAMPLES=OFF"
"-DRDKAFKA_BUILD_TESTS=OFF"
diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp
index 28b0b45..04b327d 100644
--- a/extensions/librdkafka/KafkaConnection.cpp
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -25,19 +25,24 @@ namespace processors {
KafkaConnection::KafkaConnection(const KafkaConnectionKey &key)
: logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
- conf_(nullptr),
- kafka_connection_(nullptr) {
+ kafka_connection_(nullptr),
+ poll_(false) {
lease_ = false;
initialized_ = false;
key_ = key;
}
+KafkaConnection::~KafkaConnection() {
+ remove();
+}
+
void KafkaConnection::remove() {
topics_.clear();
removeConnection();
}
void KafkaConnection::removeConnection() {
+ stopPoll();
if (kafka_connection_) {
rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
rd_kafka_destroy(kafka_connection_);
@@ -46,10 +51,6 @@ void KafkaConnection::removeConnection() {
});
kafka_connection_ = nullptr;
}
- if (conf_) {
- rd_kafka_conf_destroy(conf_);
- conf_ = nullptr;
- }
initialized_ = false;
}
@@ -57,18 +58,14 @@ bool KafkaConnection::initialized() const {
return initialized_;
}
-void KafkaConnection::setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) {
+void KafkaConnection::setConnection(rd_kafka_t *producer) {
removeConnection();
kafka_connection_ = producer;
- conf_ = conf;
initialized_ = true;
modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) {
loggers[producer] = logger_;
});
-}
-
-rd_kafka_conf_t *KafkaConnection::getConf() const {
- return conf_;
+ startPoll();
}
rd_kafka_t *KafkaConnection::getConnection() const {
diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h
index 6e158fe..774754b 100644
--- a/extensions/librdkafka/KafkaConnection.h
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -18,6 +18,7 @@
#ifndef NIFI_MINIFI_CPP_KAFKACONNECTION_H
#define NIFI_MINIFI_CPP_KAFKACONNECTION_H
+#include <atomic>
#include <mutex>
#include <string>
#include "core/logging/LoggerConfiguration.h"
@@ -46,9 +47,7 @@ class KafkaConnection {
explicit KafkaConnection(const KafkaConnectionKey &key);
- ~KafkaConnection() {
- remove();
- }
+ ~KafkaConnection();
void remove();
@@ -56,9 +55,7 @@ class KafkaConnection {
bool initialized() const;
- void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf);
-
- rd_kafka_conf_t *getConf() const;
+ void setConnection(rd_kafka_t *producer);
rd_kafka_t *getConnection() const;
@@ -90,9 +87,11 @@ class KafkaConnection {
std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
- rd_kafka_conf_t *conf_;
rd_kafka_t *kafka_connection_;
+ std::atomic<bool> poll_;
+ std::thread thread_kafka_poll_;
+
static void modifyLoggers(const std::function<void(std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
static std::mutex loggers_mutex;
static std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>> loggers;
@@ -100,6 +99,24 @@ class KafkaConnection {
std::lock_guard<std::mutex> lock(loggers_mutex);
func(loggers);
}
+
+ void stopPoll() {
+ poll_ = false;
+ logger_->log_debug("Stop polling");
+ if (thread_kafka_poll_.joinable()) {
+ thread_kafka_poll_.join();
+ }
+ }
+
+ void startPoll() {
+ poll_ = true;
+ logger_->log_debug("Start polling");
+ thread_kafka_poll_ = std::thread([this]{
+ while (this->poll_) {
+ rd_kafka_poll(this->kafka_connection_, 1000);
+ }
+ });
+ }
};
class KafkaLease {
diff --git a/extensions/librdkafka/KafkaTopic.h b/extensions/librdkafka/KafkaTopic.h
index 30c34a0..f7b8f6e 100644
--- a/extensions/librdkafka/KafkaTopic.h
+++ b/extensions/librdkafka/KafkaTopic.h
@@ -28,9 +28,8 @@ namespace processors {
class KafkaTopic {
public:
- KafkaTopic(rd_kafka_topic_t *topic_reference, rd_kafka_topic_conf_t *topic_conf)
- : topic_conf_(topic_conf),
- topic_reference_(topic_reference) {
+ KafkaTopic(rd_kafka_topic_t *topic_reference)
+ : topic_reference_(topic_reference) {
}
@@ -38,21 +37,12 @@ class KafkaTopic {
if (topic_reference_) {
rd_kafka_topic_destroy(topic_reference_);
}
- if (topic_conf_) {
- rd_kafka_topic_conf_destroy(topic_conf_);
- }
- }
-
- rd_kafka_topic_conf_t *getTopicConf() const {
- return topic_conf_;
}
-
rd_kafka_topic_t *getTopic() const {
return topic_reference_;
}
private:
- rd_kafka_topic_conf_t *topic_conf_;
rd_kafka_topic_t *topic_reference_;
};
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index b248c1e..75f8839 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -26,6 +26,7 @@
#include <set>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -36,37 +37,51 @@ namespace minifi {
namespace processors {
core::Property PublishKafka::SeedBrokers(
- core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")->isRequired(true)->supportsExpressionLanguage(
- true)->build());
+ core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
+ ->isRequired(true)->supportsExpressionLanguage(true)->build());
-core::Property PublishKafka::Topic(core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property PublishKafka::Topic(
+ core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")
+ ->isRequired(true)->supportsExpressionLanguage(true)->build());
core::Property PublishKafka::DeliveryGuarantee(
- core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka")->isRequired(false)
- ->supportsExpressionLanguage(true)->withDefaultValue("DELIVERY_ONE_NODE")->build());
+ core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka")
+ ->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
-core::Property PublishKafka::MaxMessageSize(core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")->isRequired(false)->build());
+core::Property PublishKafka::MaxMessageSize(
+ core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")
+ ->isRequired(false)->build());
core::Property PublishKafka::RequestTimeOut(
- core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request in milliseconds")->isRequired(false)->withDefaultValue<core::TimePeriodValue>(
- "10 sec")->supportsExpressionLanguage(true)->build());
+ core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request")
+ ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("10 sec")->supportsExpressionLanguage(true)->build());
+
+core::Property PublishKafka::MessageTimeOut(
+ core::PropertyBuilder::createProperty("Message Timeout")->withDescription("The total time sending a message could take")
+ ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("30 sec")->supportsExpressionLanguage(true)->build());
core::Property PublishKafka::ClientName(
- core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")->isRequired(true)->supportsExpressionLanguage(true)->build());
+ core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")
+ ->isRequired(true)->supportsExpressionLanguage(true)->build());
/**
* These don't appear to need EL support
*/
core::Property PublishKafka::BatchSize(
- core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")->isRequired(false)->withDefaultValue<uint32_t>(10)->build());
-
+ core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")
+ ->isRequired(false)->withDefaultValue<uint32_t>(10)->build());
+core::Property PublishKafka::TargetBatchPayloadSize(
+ core::PropertyBuilder::createProperty("Target Batch Payload Size")->withDescription("The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).")
+ ->isRequired(false)->withDefaultValue<core::DataSizeValue>("512 KB")->build());
core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", "");
core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", "");
core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", "");
core::Property PublishKafka::CompressCodec("Compress Codec", "compression codec to use for compressing message sets", COMPRESSION_CODEC_NONE);
-core::Property PublishKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the kafka record", "");
+core::Property PublishKafka::MaxFlowSegSize(
+ core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.")
+ ->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build());
core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", "");
core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
@@ -81,6 +96,7 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
"");
core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
"Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+
core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
@@ -92,9 +108,11 @@ void PublishKafka::initialize() {
properties.insert(DeliveryGuarantee);
properties.insert(MaxMessageSize);
properties.insert(RequestTimeOut);
+ properties.insert(MessageTimeOut);
properties.insert(ClientName);
properties.insert(AttributeNameRegex);
properties.insert(BatchSize);
+ properties.insert(TargetBatchPayloadSize);
properties.insert(QueueBufferMaxTime);
properties.insert(QueueBufferMaxSize);
properties.insert(QueueBufferMaxMessage);
@@ -119,165 +137,220 @@ void PublishKafka::initialize() {
}
void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ interrupted_ = false;
+}
+
+void PublishKafka::notifyStop() {
+ logger_->log_debug("notifyStop called");
+ interrupted_ = true;
+ std::lock_guard<std::mutex> lock(messages_mutex_);
+ for (auto& messages : messages_set_) {
+ messages->interrupt();
+ }
}
-bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &ff) {
+/**
+ * Message delivery report callback using the richer rd_kafka_message_t object.
+ */
+void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
+ if (rkmessage->_private == nullptr) {
+ return;
+ }
+ std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>* func =
+ reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
+ try {
+ (*func)(rk, rkmessage);
+ } catch (...) {
+ }
+ delete func;
+}
+
+bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context) {
std::string value;
int64_t valInt;
std::string valueConf;
- char errstr[512];
+ std::array<char, 512U> errstr;
rd_kafka_conf_res_t result;
- auto conf_ = rd_kafka_conf_new();
+ rd_kafka_conf_t* conf_ = rd_kafka_conf_new();
+ if (conf_ == nullptr) {
+ logger_->log_error("Failed to create rd_kafka_conf_t object");
+ return false;
+ }
+ utils::ScopeGuard confGuard([conf_](){
+ rd_kafka_conf_destroy(conf_);
+ });
auto key = conn->getKey();
- if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
- rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: debug properties [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure debug properties error result [%s]", errstr);
- }
-
- if (!key->brokers_.empty()) {
- result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
- } else {
+ if (key->brokers_.empty()) {
logger_->log_error("There are no brokers");
return false;
}
+ result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
- if (!key->client_id_.empty()) {
- rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
- } else {
+ if (key->client_id_.empty()) {
logger_->log_error("Client id is empty");
return false;
}
+ result = rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
-// Kerberos configuration
+ value = "";
+ if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: debug [%s]", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure debug error result [%s]", errstr.data());
+ return false;
+ }
+ }
+ value = "";
if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
- result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
- result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
- result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
valueConf = std::to_string(valInt);
- result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) {
- rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
- }
- value = "";
- if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
- attributeNameRegex.assign(value);
- logger_->log_debug("PublishKafka: AttributeNameRegex %s", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
valInt = valInt / 1024;
valueConf = std::to_string(valInt);
- rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
- }
- value = "";
- max_seg_size_ = ULLONG_MAX;
- if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
- max_seg_size_ = valInt;
- logger_->log_debug("PublishKafka: max flow segment size [%llu]", max_seg_size_);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
- rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr.data());
+ return false;
+ }
}
}
value = "";
if (context->getProperty(BatchSize.getName(), value) && !value.empty()) {
- rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(CompressCodec.getName(), value) && !value.empty() && value != "none") {
- rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: compression.codec [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr.data());
+ return false;
+ }
}
value = "";
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == SECURITY_PROTOCOL_SSL) {
- rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: security.protocol [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
- } else {
- value = "";
- if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
- rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
+ value = "";
+ if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
}
- value = "";
- if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
- rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ }
+ value = "";
+ if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
}
- value = "";
- if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
- rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ }
+ value = "";
+ if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
}
- value = "";
- if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
- rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure error result [%s]", errstr);
+ }
+ value = "";
+ if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
}
}
+ } else {
+ logger_->log_error("PublishKafka: unknown Security Protocol: %s", value);
+ return false;
}
}
@@ -289,122 +362,320 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
value = "";
if (context->getDynamicProperty(key, value) && !value.empty()) {
logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", key, value);
- rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr, sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr.data(), errstr.size());
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
+ return false;
+ }
} else {
logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", key);
}
}
+ // Set the delivery callback
+ rd_kafka_conf_set_dr_msg_cb(conf_, &PublishKafka::messageDeliveryCallback);
+
// Set the logger callback
- rd_kafka_conf_set_log_cb(conf_, KafkaConnection::logCallback);
+ rd_kafka_conf_set_log_cb(conf_, &KafkaConnection::logCallback);
+
+ rd_kafka_t* producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr.data(), errstr.size());
+
+ if (producer == nullptr) {
+ logger_->log_error("Failed to create Kafka producer %s", errstr.data());
+ return false;
+ }
+
+ // The producer took ownership of the configuration, we must not free it
+ confGuard.disable();
+
+ conn->setConnection(producer);
+
+ return true;
+}
+
+bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) {
+ rd_kafka_topic_conf_t* topic_conf_ = rd_kafka_topic_conf_new();
+ if (topic_conf_ == nullptr) {
+ logger_->log_error("Failed to create rd_kafka_topic_conf_t object");
+ return false;
+ }
+ utils::ScopeGuard confGuard([topic_conf_](){
+ rd_kafka_topic_conf_destroy(topic_conf_);
+ });
+
+ rd_kafka_conf_res_t result;
+ std::string value;
+ std::array<char, 512U> errstr;
+ int64_t valInt;
+ std::string valueConf;
- auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr));
+ value = "";
+ if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) {
+ result = rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure request.required.acks error result [%s]", errstr.data());
+ return false;
+ }
+ }
+ value = "";
+ if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) {
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, valInt, unit) &&
+ core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+ valueConf = std::to_string(valInt);
+ result = rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure request.timeout.ms error result [%s]", errstr.data());
+ return false;
+ }
+ }
+ }
+ value = "";
+ if (context->getProperty(MessageTimeOut.getName(), value) && !value.empty()) {
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, valInt, unit) &&
+ core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+ valueConf = std::to_string(valInt);
+ result = rd_kafka_topic_conf_set(topic_conf_, "message.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: message.timeout.ms [%s]", valueConf);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure message.timeout.ms error result [%s]", errstr.data());
+ return false;
+ }
+ }
+ }
- if (!producer) {
- logger_->log_error("Failed to create Kafka producer %s", errstr);
+ rd_kafka_topic_t* topic_reference = rd_kafka_topic_new(conn->getConnection(), topic_name.c_str(), topic_conf_);
+ if (topic_reference == nullptr) {
+ rd_kafka_resp_err_t resp_err = rd_kafka_last_error();
+ logger_->log_error("PublishKafka: failed to create topic %s, error: %s", topic_name.c_str(), rd_kafka_err2str(resp_err));
return false;
}
- conn->setConnection(producer, conf_);
+ // The topic took ownership of the configuration, we must not free it
+ confGuard.disable();
+
+ std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference);
+
+ conn->putTopic(topic_name, kafkaTopicref);
return true;
}
void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- logger_->log_trace("Enter trigger");
- std::shared_ptr<core::FlowFile> flowFile = session->get();
+ // Check whether we have been interrupted
+ if (interrupted_) {
+ logger_->log_info("The processor has been interrupted, not running onTrigger");
+ context->yield();
+ return;
+ }
- if (!flowFile) {
+ // Try to get a KafkaConnection
+ std::string client_id, brokers;
+ if (!context->getProperty(ClientName.getName(), client_id)) {
+ logger_->log_error("Client Name property missing or invalid");
+ context->yield();
+ return;
+ }
+ if (!context->getProperty(SeedBrokers.getName(), brokers)) {
+ logger_->log_error("Knowb Brokers property missing or invalid");
+ context->yield();
return;
}
- std::string client_id, brokers, topic;
+ KafkaConnectionKey key;
+ key.brokers_ = brokers;
+ key.client_id_ = client_id;
- std::unique_ptr<KafkaLease> lease;
- std::shared_ptr<KafkaConnection> conn;
-// get the client ID, brokers, and topic from either the flowfile, the configuration, or the properties
- if (context->getProperty(ClientName, client_id, flowFile) && context->getProperty(SeedBrokers, brokers, flowFile) && context->getProperty(Topic, topic, flowFile)) {
- KafkaConnectionKey key;
- key.brokers_ = brokers;
- key.client_id_ = client_id;
+ std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key);
+ if (lease == nullptr) {
+ logger_->log_info("This connection is used by another thread.");
+ context->yield();
+ return;
+ }
- lease = connection_pool_.getOrCreateConnection(key);
- if (lease == nullptr) {
- logger_->log_info("This connection is used by another thread.");
+ std::shared_ptr<KafkaConnection> conn = lease->getConn();
+ if (!conn->initialized()) {
+ logger_->log_trace("Connection not initialized to %s, %s", client_id, brokers);
+ if (!configureNewConnection(conn, context)) {
+ logger_->log_error("Could not configure Kafka Connection");
context->yield();
return;
}
- conn = lease->getConn();
-
- if (!conn->initialized()) {
- logger_->log_trace("Connection not initialized to %s, %s, %s", client_id, brokers, topic);
- if (!configureNewConnection(conn, context, flowFile)) {
- logger_->log_error("Could not configure Kafka Connection");
- session->transfer(flowFile, Failure);
- return;
- }
- }
+ }
- if (!conn->hasTopic(topic)) {
- auto topic_conf_ = rd_kafka_topic_conf_new();
- auto topic_reference = rd_kafka_topic_new(conn->getConnection(), topic.c_str(), topic_conf_);
- rd_kafka_conf_res_t result;
- std::string value;
- char errstr[512];
- int64_t valInt;
- std::string valueConf;
-
- if (context->getProperty(DeliveryGuarantee, value, flowFile) && !value.empty()) {
- rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure delivery guarantee error result [%s]", errstr);
- }
- value = "";
- if (context->getProperty(RequestTimeOut, value, flowFile) && !value.empty()) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
- valueConf = std::to_string(valInt);
- rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure timeout error result [%s]", errstr);
- }
- }
+ // Get some properties not (only) used directly to set up librdkafka
+ std::string value;
- std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference, topic_conf_);
+ // Batch Size
+ uint32_t batch_size;
+ value = "";
+ if (context->getProperty(BatchSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, batch_size)) {
+ logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size);
+ } else {
+ batch_size = 10;
+ }
- conn->putTopic(topic, kafkaTopicref);
- }
+ // Target Batch Payload Size
+ uint64_t target_batch_payload_size;
+ value = "";
+ if (context->getProperty(TargetBatchPayloadSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, target_batch_payload_size)) {
+ logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size);
} else {
- logger_->log_error("Do not have required properties");
- session->transfer(flowFile, Failure);
- return;
+ target_batch_payload_size = 512 * 1024U;
}
- std::string kafkaKey;
- kafkaKey = "";
- if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
- logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
+ // Max Flow Segment Size
+ uint64_t max_flow_seg_size;
+ value = "";
+ if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, max_flow_seg_size)) {
+ logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size);
} else {
- kafkaKey = flowFile->getUUIDStr();
+ max_flow_seg_size = 0U;
+ }
+
+ // Attributes to Send as Headers
+ utils::Regex attributeNameRegex;
+ value = "";
+ if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
+ attributeNameRegex = utils::Regex(value);
+ logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
+ }
+
+ // Collect FlowFiles to process
+ uint64_t actual_bytes = 0U;
+ std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
+ for (uint32_t i = 0; i < batch_size; i++) {
+ std::shared_ptr<core::FlowFile> flowFile = session->get();
+ if (flowFile == nullptr) {
+ break;
+ }
+ actual_bytes += flowFile->getSize();
+ flowFiles.emplace_back(std::move(flowFile));
+ if (target_batch_payload_size != 0U && actual_bytes >= target_batch_payload_size) {
+ break;
+ }
+ }
+ if (flowFiles.empty()) {
+ context->yield();
+ return;
}
+ logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);
+
+ auto messages = std::make_shared<Messages>();
+ // We must add this to the messages set, so that it will be interrupted when notifyStop is called
+ {
+ std::lock_guard<std::mutex> lock(messages_mutex_);
+ messages_set_.emplace(messages);
+ }
+ // We also have to insure that it will be removed once we are done with it
+ utils::ScopeGuard messagesSetGuard([&]() {
+ std::lock_guard<std::mutex> lock(messages_mutex_);
+ messages_set_.erase(messages);
+ });
+
+ // Process FlowFiles
+ for (auto& flowFile : flowFiles) {
+ size_t flow_file_index = messages->addFlowFile();
+
+ // Get Topic (FlowFile-dependent EL property)
+ std::string topic;
+ if (!context->getProperty(Topic, topic, flowFile)) {
+ logger_->log_error("Flow file %s does not have a valid Topic", flowFile->getUUIDStr());
+ messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+ flow_file_result.flow_file_error = true;
+ });
+ continue;
+ }
+
+ // Add topic to the connection if needed
+ if (!conn->hasTopic(topic)) {
+ if (!createNewTopic(conn, context, topic)) {
+ logger_->log_error("Failed to add topic %s", topic);
+ messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+ flow_file_result.flow_file_error = true;
+ });
+ continue;
+ }
+ }
- auto thisTopic = conn->getTopic(topic);
- if (thisTopic) {
- PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile, attributeNameRegex);
+ std::string kafkaKey;
+ kafkaKey = "";
+ if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
+ logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
+ } else {
+ kafkaKey = flowFile->getUUIDStr();
+ }
+
+ auto thisTopic = conn->getTopic(topic);
+ if (thisTopic == nullptr) {
+ logger_->log_error("Topic %s is invalid", topic);
+ messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+ flow_file_result.flow_file_error = true;
+ });
+ continue;
+ }
+
+ PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
+ attributeNameRegex, messages, flow_file_index);
session->read(flowFile, &callback);
if (callback.status_ < 0) {
- logger_->log_error("Failed to send flow to kafka topic %s", topic);
- session->transfer(flowFile, Failure);
- } else {
- logger_->log_debug("Sent flow with length %d to kafka topic %s", callback.read_size_, topic);
- session->transfer(flowFile, Success);
+ logger_->log_error("Failed to send flow to kafka topic %s, error: %s", topic, callback.error_);
+ messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
+ flow_file_result.flow_file_error = true;
+ });
+ continue;
}
- } else {
- logger_->log_error("Topic %s is invalid", topic);
- session->transfer(flowFile, Failure);
}
+
+ logger_->log_trace("PublishKafka::onTrigger waitForCompletion start");
+ messages->waitForCompletion();
+ if (messages->wasInterrupted()) {
+ logger_->log_warn("Waiting for delivery confirmation was interrupted, some flow files might be routed to Failure, even if they were successfully delivered.");
+ }
+ logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish");
+
+ messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) {
+ bool success;
+ if (flow_file.flow_file_error) {
+ success = false;
+ } else if (flow_file.messages.empty()) {
+ success = false;
+ logger_->log_error("Assertion error: no messages found for flow file %s", flowFiles[index]->getUUIDStr());
+ } else {
+ success = true;
+ for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
+ const auto& message = flow_file.messages[segment_num];
+ switch (message.status) {
+ case MessageStatus::MESSAGESTATUS_UNCOMPLETE:
+ success = false;
+ logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
+ flowFiles[index]->getUUIDStr(),
+ segment_num);
+ break;
+ case MessageStatus::MESSAGESTATUS_ERROR:
+ success = false;
+ logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
+ flowFiles[index]->getUUIDStr(),
+ segment_num,
+ rd_kafka_err2str(message.err_code));
+ break;
+ case MessageStatus::MESSAGESTATUS_SUCCESS:
+ logger_->log_debug("Successfully delivered flow file %s segment %zu",
+ flowFiles[index]->getUUIDStr(),
+ segment_num);
+ break;
+ }
+ }
+ }
+ if (success) {
+ session->transfer(flowFiles[index], Success);
+ } else {
+ session->transfer(flowFiles[index], Failure);
+ }
+ });
}
} /* namespace processors */
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 4d0a759..2881f49 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -28,9 +28,15 @@
#include "core/Property.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/logging/Logger.h"
+#include "utils/RegexUtils.h"
#include "rdkafka.h"
#include "KafkaPool.h"
-#include <regex>
+#include <atomic>
+#include <map>
+#include <set>
+#include <mutex>
+#include <cstdint>
+#include <condition_variable>
namespace org {
namespace apache {
@@ -63,8 +69,8 @@ class PublishKafka : public core::Processor {
explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(name, uuid),
connection_pool_(5),
- logger_(logging::LoggerFactory<PublishKafka>::getLogger()) {
- max_seg_size_ = -1;
+ logger_(logging::LoggerFactory<PublishKafka>::getLogger()),
+ interrupted_(false) {
}
// Destructor
virtual ~PublishKafka() {
@@ -77,8 +83,10 @@ class PublishKafka : public core::Processor {
static core::Property DeliveryGuarantee;
static core::Property MaxMessageSize;
static core::Property RequestTimeOut;
+ static core::Property MessageTimeOut;
static core::Property ClientName;
static core::Property BatchSize;
+ static core::Property TargetBatchPayloadSize;
static core::Property AttributeNameRegex;
static core::Property QueueBufferMaxTime;
static core::Property QueueBufferMaxSize;
@@ -100,29 +108,124 @@ class PublishKafka : public core::Processor {
static core::Relationship Failure;
static core::Relationship Success;
+ // Message
+ enum class MessageStatus : uint8_t {
+ MESSAGESTATUS_UNCOMPLETE,
+ MESSAGESTATUS_ERROR,
+ MESSAGESTATUS_SUCCESS
+ };
+
+ struct MessageResult {
+ MessageStatus status;
+ rd_kafka_resp_err_t err_code;
+
+ MessageResult()
+ : status(MessageStatus::MESSAGESTATUS_UNCOMPLETE) {
+ }
+ };
+ struct FlowFileResult {
+ bool flow_file_error;
+ std::vector<MessageResult> messages;
+
+ FlowFileResult()
+ : flow_file_error(false) {
+ }
+ };
+ struct Messages {
+ std::mutex mutex;
+ std::condition_variable cv;
+ std::vector<FlowFileResult> flow_files;
+ bool interrupted;
+
+ Messages()
+ : interrupted(false) {
+ }
+
+ void waitForCompletion() {
+ std::unique_lock<std::mutex> lock(mutex);
+ cv.wait(lock, [this]() -> bool {
+ if (interrupted) {
+ return true;
+ }
+ size_t index = 0U;
+ return std::all_of(this->flow_files.begin(), this->flow_files.end(), [&](const FlowFileResult& flow_file) {
+ index++;
+ if (flow_file.flow_file_error) {
+ return true;
+ }
+ return std::all_of(flow_file.messages.begin(), flow_file.messages.end(), [](const MessageResult& message) {
+ return message.status != MessageStatus::MESSAGESTATUS_UNCOMPLETE;
+ });
+ });
+ });
+ }
+
+ void modifyResult(size_t index, const std::function<void(FlowFileResult&)>& fun) {
+ std::unique_lock<std::mutex> lock(mutex);
+ fun(flow_files.at(index));
+ cv.notify_all();
+ }
+
+ size_t addFlowFile() {
+ std::lock_guard<std::mutex> lock(mutex);
+ flow_files.emplace_back();
+ return flow_files.size() - 1;
+ }
+
+ void iterateFlowFiles(const std::function<void(size_t /*index*/, const FlowFileResult& /*flow_file_result*/)>& fun) {
+ std::lock_guard<std::mutex> lock(mutex);
+ for (size_t index = 0U; index < flow_files.size(); index++) {
+ fun(index, flow_files[index]);
+ }
+ }
+
+ void interrupt() {
+ std::unique_lock<std::mutex> lock(mutex);
+ interrupted = true;
+ cv.notify_all();
+ }
+
+ bool wasInterrupted() {
+ std::lock_guard<std::mutex> lock(mutex);
+ return interrupted;
+ }
+ };
+
// Nest Callback Class for read stream
class ReadCallback : public InputStreamCallback {
public:
- ReadCallback(uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt, rd_kafka_t *rk, const std::shared_ptr<core::FlowFile> flowFile, const std::regex &attributeNameRegex)
+ ReadCallback(uint64_t max_seg_size,
+ const std::string &key,
+ rd_kafka_topic_t *rkt,
+ rd_kafka_t *rk,
+ const std::shared_ptr<core::FlowFile> flowFile,
+ utils::Regex &attributeNameRegex,
+ std::shared_ptr<Messages> messages,
+ size_t flow_file_index)
: max_seg_size_(max_seg_size),
key_(key),
rkt_(rkt),
rk_(rk),
flowFile_(flowFile),
+ messages_(std::move(messages)),
+ flow_file_index_(flow_file_index),
attributeNameRegex_(attributeNameRegex) {
flow_size_ = flowFile_->getSize();
status_ = 0;
read_size_ = 0;
hdrs = nullptr;
}
+
~ReadCallback() {
if (hdrs) {
rd_kafka_headers_destroy(hdrs);
}
}
+
int64_t process(std::shared_ptr<io::BaseStream> stream) {
- if (flow_size_ < max_seg_size_)
+ if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
max_seg_size_ = flow_size_;
+ }
std::vector<unsigned char> buffer;
buffer.reserve(max_seg_size_);
read_size_ = 0;
@@ -130,7 +233,7 @@ class PublishKafka : public core::Processor {
rd_kafka_resp_err_t err;
for (auto kv : flowFile_->getAttributes()) {
- if (regex_match(kv.first, attributeNameRegex_)) {
+ if(attributeNameRegex_.match(kv.first)) {
if (!hdrs) {
hdrs = rd_kafka_headers_new(8);
}
@@ -138,36 +241,60 @@ class PublishKafka : public core::Processor {
}
}
+ size_t segment_num = 0U;
while (read_size_ < flow_size_) {
int readRet = stream->read(&buffer[0], max_seg_size_);
if (readRet < 0) {
status_ = -1;
+ error_ = "Failed to read from stream";
return read_size_;
}
if (readRet > 0) {
+ messages_->modifyResult(flow_file_index_, [](FlowFileResult& flow_file) {
+ flow_file.messages.resize(flow_file.messages.size() + 1);
+ });
+ auto messages_copy = this->messages_;
+ auto flow_file_index_copy = this->flow_file_index_;
+ auto callback = std::unique_ptr<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(
+ new std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>(
+ [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t* /*rk*/, const rd_kafka_message_t* rkmessage) {
+ messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult& flow_file) {
+ auto& message = flow_file.messages.at(segment_num);
+ message.err_code = rkmessage->err;
+ message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+ });
+ }));
if (hdrs) {
rd_kafka_headers_t *hdrs_copy;
hdrs_copy = rd_kafka_headers_copy(hdrs);
err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet),
- RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END);
+ RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback.release()), RD_KAFKA_V_END);
if (err) {
rd_kafka_headers_destroy(hdrs_copy);
}
} else {
err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet),
- RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END);
+ RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback.release()), RD_KAFKA_V_END);
}
if (err) {
+ messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
+ auto& message = flow_file.messages.at(segment_num);
+ message.status = MessageStatus::MESSAGESTATUS_ERROR;
+ message.err_code = err;
+ });
status_ = -1;
+ error_ = rd_kafka_err2str(err);
return read_size_;
}
read_size_ += readRet;
} else {
break;
}
+ segment_num++;
}
return read_size_;
}
+
uint64_t flow_size_;
uint64_t max_seg_size_;
std::string key_;
@@ -175,14 +302,17 @@ class PublishKafka : public core::Processor {
rd_kafka_t *rk_;
rd_kafka_headers_t *hdrs;
std::shared_ptr<core::FlowFile> flowFile_;
+ std::shared_ptr<Messages> messages_;
+ size_t flow_file_index_;
int status_;
+ std::string error_;
int read_size_;
- std::regex attributeNameRegex_;
+ utils::Regex& attributeNameRegex_;
};
public:
- virtual bool supportsDynamicProperties() {
+ virtual bool supportsDynamicProperties() override {
return true;
}
@@ -195,23 +325,23 @@ class PublishKafka : public core::Processor {
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
virtual void initialize() override;
virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ virtual void notifyStop() override;
protected:
- bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &ff);
+ bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context);
+ bool createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
private:
+ static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque);
+
std::shared_ptr<logging::Logger> logger_;
KafkaPool connection_pool_;
-// rd_kafka_conf_t *conf_;
- //rd_kafka_t *rk_;
- //1rd_kafka_topic_conf_t *topic_conf_;
- //rd_kafka_topic_t *rkt_;
- //std::string topic_;
- uint64_t max_seg_size_;
- std::regex attributeNameRegex;
+ std::atomic<bool> interrupted_;
+ std::mutex messages_mutex_;
+ std::set<std::shared_ptr<Messages>> messages_set_;
};
REGISTER_RESOURCE(PublishKafka, "This Processor puts the contents of a FlowFile to a Topic in Apache Kafka. The content of a FlowFile becomes the contents of a Kafka message. "
diff --git a/thirdparty/librdkafka/librdkafka-libressl.patch b/thirdparty/librdkafka/librdkafka-libressl.patch
new file mode 100644
index 0000000..c99a9d9
--- /dev/null
+++ b/thirdparty/librdkafka/librdkafka-libressl.patch
@@ -0,0 +1,148 @@
+diff --git a/src/rdkafka_transport.c b/src/rdkafka_transport.c
+index 9c7fc36c..de083109 100644
+--- a/src/rdkafka_transport.c
++++ b/src/rdkafka_transport.c
+@@ -3,24 +3,24 @@
+ *
+ * Copyright (c) 2015, Magnus Edenhill
+ * All rights reserved.
+- *
++ *
+ * Redistribution and use in source and binary forms, with or without
+- * modification, are permitted provided that the following conditions are met:
+- *
++ * modification, are permitted provided that the following conditions are met:
++ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+- * this list of conditions and the following disclaimer.
++ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+- * and/or other materials provided with the distribution.
+- *
++ * and/or other materials provided with the distribution.
++ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
++ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
++ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
++ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
++ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
++ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
++ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
++ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+@@ -433,7 +433,7 @@ static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
+ else
+ rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr);
+ }
+-
++
+ ERR_error_string_n(l, buf, sizeof(buf));
+
+ rd_snprintf(errstr, errstr_size, "%s:%d: %s: %s",
+@@ -443,7 +443,7 @@ static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
+
+ if (cnt == 0)
+ rd_snprintf(errstr, errstr_size, "No error");
+-
++
+ return errstr;
+ }
+
+@@ -507,7 +507,7 @@ void rd_kafka_transport_ssl_term (void) {
+ void rd_kafka_transport_ssl_init (void) {
+ #if OPENSSL_VERSION_NUMBER < 0x10100000L
+ int i;
+-
++
+ if (!CRYPTO_get_locking_callback()) {
+ rd_kafka_ssl_locks_cnt = CRYPTO_num_locks();
+ rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt *
+@@ -736,11 +736,11 @@ static int rd_kafka_transport_ssl_connect (rd_kafka_broker_t *rkb,
+ return 0;
+ }
+
+-
++
+ if (rd_kafka_transport_ssl_io_update(rktrans, r,
+ errstr, errstr_size) == -1)
+ return -1;
+-
++
+ return 0;
+
+ fail:
+@@ -863,17 +863,6 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
+ int r;
+ SSL_CTX *ctx;
+
+-#if OPENSSL_VERSION_NUMBER >= 0x10100000
+- rd_kafka_dbg(rk, SECURITY, "OPENSSL", "Using OpenSSL version %s "
+- "(0x%lx, librdkafka built with 0x%lx)",
+- OpenSSL_version(OPENSSL_VERSION),
+- OpenSSL_version_num(),
+- OPENSSL_VERSION_NUMBER);
+-#else
+- rd_kafka_dbg(rk, SECURITY, "OPENSSL", "librdkafka built with OpenSSL "
+- "version 0x%lx", OPENSSL_VERSION_NUMBER);
+-#endif
+-
+ if (errstr_size > 0)
+ errstr[0] = '\0';
+
+@@ -945,7 +934,7 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
+ "Loading CA certificate(s) from %s %s",
+ is_dir ? "directory":"file",
+ rk->rk_conf.ssl.ca_location);
+-
++
+ r = SSL_CTX_load_verify_locations(ctx,
+ !is_dir ?
+ rk->rk_conf.ssl.
+@@ -1034,8 +1023,8 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk,
+
+ if (!(fp = fopen(rk->rk_conf.ssl.keystore_location, "rb"))) {
+ rd_snprintf(errstr, errstr_size,
+- "Failed to open ssl.keystore.location: %s: %s",
+- rk->rk_conf.ssl.keystore_location,
++ "Failed to open ssl.keystore.location: %s: %s",
++ rk->rk_conf.ssl.keystore_location,
+ rd_strerror(errno));
+ goto fail;
+ }
+@@ -1495,7 +1484,7 @@ static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans,
+ /**
+ * Poll and serve IOs
+ *
+- * Locality: broker thread
++ * Locality: broker thread
+ */
+ void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
+ int timeout_ms) {
+@@ -1545,7 +1534,7 @@ rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,
+
+ #ifdef SO_NOSIGPIPE
+ /* Disable SIGPIPE signalling for this socket on OSX */
+- if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1)
++ if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1)
+ rd_rkb_dbg(rkb, BROKER, "SOCKET",
+ "Failed to set SO_NOSIGPIPE: %s",
+ socket_strerror(socket_errno));
+@@ -1709,7 +1698,7 @@ void rd_kafka_transport_term (void) {
+ #endif
+ }
+ #endif
+-
++
+ void rd_kafka_transport_init(void) {
+ #ifdef _MSC_VER
+ WSADATA d;