You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/06/15 07:47:40 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #811: MINIFICPP-1257 don't leak kafka messages on failed send to broker

adamdebreceni commented on a change in pull request #811:
URL: https://github.com/apache/nifi-minifi-cpp/pull/811#discussion_r439985423



##########
File path: extensions/librdkafka/PublishKafka.h
##########
@@ -233,9 +233,13 @@ class PublishKafka : public core::Processor {
 
       const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
       const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
-                                         RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
-      if (err) {
-        // the message only takes ownership of the headers in case of success
+                                         RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
+      if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {

Review comment:
       👍

##########
File path: extensions/librdkafka/PublishKafka.h
##########
@@ -233,9 +233,13 @@ class PublishKafka : public core::Processor {
 
       const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
       const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
-                                         RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
-      if (err) {
-        // the message only takes ownership of the headers in case of success
+                                         RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
+      if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
+        // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
+        // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it

Review comment:
       I'm not familiar with this processor, I assume messageDeliveryCallback is async called, could we have a scenario where the messageDeliveryCallback is not eventually called?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org