You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/30 08:44:42 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1050 - Make
PublishKafka's Delivery Guarantee backwards compatible
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 8baf3be MINIFICPP-1050 - Make PublishKafka's Delivery Guarantee backwards compatible
8baf3be is described below
commit 8baf3bed60f9a08688db43c82ba01a2fea6b6718
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Sat Sep 28 21:28:54 2019 +0200
MINIFICPP-1050 - Make PublishKafka's Delivery Guarantee backwards compatible
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #654
---
NOTICE | 1 +
extensions/librdkafka/PublishKafka.cpp | 20 +++++++++++++++++++-
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/NOTICE b/NOTICE
index 7dcbc81..e8dcee8 100644
--- a/NOTICE
+++ b/NOTICE
@@ -34,3 +34,4 @@ This includes derived works from the CMake (BSD 3-Clause licensed) project (http
Copyright 2000-2019 Kitware, Inc. and Contributors
The derived work is adapted from
Modules/FindPatch.cmake
+and can be found in cmake/FindPatch.cmake
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 75f8839..de72d23 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -45,7 +45,10 @@ core::Property PublishKafka::Topic(
->isRequired(true)->supportsExpressionLanguage(true)->build());
core::Property PublishKafka::DeliveryGuarantee(
- core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka")
+ core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka. "
+ "Valid values are 0 (do not wait for acks), "
+ "-1 or all (block until message is committed by all in sync replicas) "
+ "or any concrete number of nodes.")
->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
core::Property PublishKafka::MaxMessageSize(
@@ -411,6 +414,21 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn,
value = "";
if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) {
+ /*
+ * Because of a previous error in this processor, the default value of this property was "DELIVERY_ONE_NODE".
+ * As this is not a valid value for "request.required.acks", the following rd_kafka_topic_conf_set call failed,
+ * but because of an another error, this failure was silently ignored, meaning that the the default value for
+ * "request.required.acks" did not change, and thus remained "-1". This means that having "DELIVERY_ONE_NODE" as
+ * the value of this property actually caused the processor to wait for delivery ACKs from ALL nodes, instead
+ * of just one. In order not to break configurations generated with earlier versions and keep the same behaviour
+ * as they had, we have to map "DELIVERY_ONE_NODE" to "-1" here.
+ */
+ if (value == "DELIVERY_ONE_NODE") {
+ value = "-1";
+ logger_->log_warn("Using DELIVERY_ONE_NODE as the Delivery Guarantee property is deprecated and is translated to -1 "
+ "(block until message is committed by all in sync replicas) for backwards compatibility. "
+ "If you want to wait for one acknowledgment use '1' as the property.");
+ }
result = rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
if (result != RD_KAFKA_CONF_OK) {