You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/09/30 08:44:42 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1050 - Make PublishKafka's Delivery Guarantee backwards compatible

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 8baf3be  MINIFICPP-1050 - Make PublishKafka's Delivery Guarantee backwards compatible
8baf3be is described below

commit 8baf3bed60f9a08688db43c82ba01a2fea6b6718
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Sat Sep 28 21:28:54 2019 +0200

    MINIFICPP-1050 - Make PublishKafka's Delivery Guarantee backwards compatible
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #654
---
 NOTICE                                 |  1 +
 extensions/librdkafka/PublishKafka.cpp | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/NOTICE b/NOTICE
index 7dcbc81..e8dcee8 100644
--- a/NOTICE
+++ b/NOTICE
@@ -34,3 +34,4 @@ This includes derived works from the CMake (BSD 3-Clause licensed) project (http
 Copyright 2000-2019 Kitware, Inc. and Contributors
 The derived work is adapted from
   Modules/FindPatch.cmake
+and can be found in cmake/FindPatch.cmake
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 75f8839..de72d23 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -45,7 +45,10 @@ core::Property PublishKafka::Topic(
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
 core::Property PublishKafka::DeliveryGuarantee(
-    core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka")
+    core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka. "
+                                                                                 "Valid values are 0 (do not wait for acks), "
+                                                                                 "-1 or all (block until message is committed by all in sync replicas) "
+                                                                                 "or any concrete number of nodes.")
         ->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
 
 core::Property PublishKafka::MaxMessageSize(
@@ -411,6 +414,21 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn,
 
   value = "";
   if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) {
+    /*
+     * Because of a previous error in this processor, the default value of this property was "DELIVERY_ONE_NODE".
+     * As this is not a valid value for "request.required.acks", the following rd_kafka_topic_conf_set call failed,
+     * but because of an another error, this failure was silently ignored, meaning that the the default value for
+     * "request.required.acks" did not change, and thus remained "-1". This means that having "DELIVERY_ONE_NODE" as
+     * the value of this property actually caused the processor to wait for delivery ACKs from ALL nodes, instead
+     * of just one. In order not to break configurations generated with earlier versions and keep the same behaviour
+     * as they had, we have to map "DELIVERY_ONE_NODE" to "-1" here.
+     */
+    if (value == "DELIVERY_ONE_NODE") {
+      value = "-1";
+      logger_->log_warn("Using DELIVERY_ONE_NODE as the Delivery Guarantee property is deprecated and is translated to -1 "
+                        "(block until message is committed by all in sync replicas) for backwards compatibility. "
+                        "If you want to wait for one acknowledgment use '1' as the property.");
+    }
     result = rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {