You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/01/20 14:02:31 UTC

[GitHub] [nifi-minifi-cpp] szaszm opened a new pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

szaszm opened a new pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712
 
 
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-583418312
 
 
   @szaszm moving the make_headers call to the ReadCallback constructor body solves the issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368570789
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,150 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    rd_kafka_headers_s* make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      rd_kafka_resp_err_t err{};
+
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        rd_kafka_headers_t * const hdrs_copy = rd_kafka_headers_copy(hdrs);
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+      } else {
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+      return err;
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex)
+    { }
 
     ~ReadCallback() {
       if (hdrs) {
         rd_kafka_headers_destroy(hdrs);
       }
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       rd_kafka_resp_err_t err;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
-      }
+      called_ = true;
 
       size_t segment_num = 0U;
+
+      assert(flow_size_ == 0 || max_seg_size_ != 0 && "at this point, max_seg_size_ is only zero if flow_size_ is zero");
 
 Review comment:
   Assert doesn't really compile to anything (as we mainly compile release with debug symbols), so this doesn't really add value.
   A logging error here might help more. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369413767
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,13 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::FailEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Fail empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is "
+                          "deprecated. Use connections to drop empty flow files!")
+        ->isRequired(false)
 
 Review comment:
   I think it should be required. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368569213
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,14 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::DropEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Drop empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka. The old behavior is deprecated. Use connections "
+                          "to drop empty flow files!")
+        ->isRequired(false)
+        ->withDefaultValue<bool>(true)
+        ->build()
+);
 
 Review comment:
   Just nitpicking, but it's usually at the end of the last call (usually "build")

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368579443
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
+      // workaround: call callback since ProcessSession doesn't do so for empty flow files
+      logger_->log_debug("ReadCallback workaround on empty flow file, because DropEmptyFlowFiles is false, uuid: %s", flowFile->getUUIDStr());
+      callback.process(nullptr);
+    }
+    if (!callback.called_ && dropEmptyFlowFiles) {
+      logger_->log_info("Deprecated behavior, use connections to drop empty flow files! Dropped empty flow file with uuid: %s", flowFile->getUUIDStr());
 
 Review comment:
   We can not move this to Connection, since the data loss happens in the processors, and no Connections have any way of knowing whether the empty flow file was processed or not.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368579900
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,14 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::DropEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Drop empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka. The old behavior is deprecated. Use connections "
+                          "to drop empty flow files!")
+        ->isRequired(false)
+        ->withDefaultValue<bool>(true)
+        ->build()
+);
 
 Review comment:
   What is at the end of the last call? Do you mean the closing paren and semicolon?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r377034602
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -194,120 +187,142 @@ class PublishKafka : public core::Processor {
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    struct rd_kafka_headers_deleter {
+      void operator()(rd_kafka_headers_t* ptr) const noexcept {
+        rd_kafka_headers_destroy(ptr);
+      }
+    };
+
+    using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
+
+   private:
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
+      const utils::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
+      if (!result) { throw std::bad_alloc{}; }
+
+      for (const auto& kv : flow_file.getAttributes()) {
 
 Review comment:
   rebased on top of dfbe225, That should solve the issue

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369638075
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,13 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::FailEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Fail empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is "
+                          "deprecated. Use connections to drop empty flow files!")
+        ->isRequired(false)
 
 Review comment:
   In our perspective, it does, as whenever we get the value, there is a valid value. 
   
   Required would force the user to specify something, in this case it doesn't apply, so it ensures backward compatibility as you highlighted. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369416049
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    utils::owner<rd_kafka_headers_s*> make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        const utils::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs);
+        const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                           RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+        return err;
+      } else {
+        return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index,
+                 const bool fail_empty_flow_files)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex),
+          fail_empty_flow_files_(fail_empty_flow_files)
+    { }
 
     ~ReadCallback() {
-      if (hdrs) {
-        rd_kafka_headers_destroy(hdrs);
-      }
+      rd_kafka_headers_destroy(hdrs);
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
-      rd_kafka_resp_err_t err;
+      called_ = true;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
+      assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
+      // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
+      const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
+      messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
+        flow_file.messages.reserve(reserved_msg_capacity);
+      });
+
+      // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
 
 Review comment:
   The default value is the property is now true, which means in this case we drop the files. 
   This is not a backward compatible behaviour. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369471703
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    utils::owner<rd_kafka_headers_s*> make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        const utils::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs);
+        const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                           RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+        return err;
+      } else {
+        return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index,
+                 const bool fail_empty_flow_files)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex),
+          fail_empty_flow_files_(fail_empty_flow_files)
+    { }
 
     ~ReadCallback() {
-      if (hdrs) {
-        rd_kafka_headers_destroy(hdrs);
-      }
+      rd_kafka_headers_destroy(hdrs);
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
-      rd_kafka_resp_err_t err;
+      called_ = true;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
+      assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
+      // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
+      const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
+      messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
+        flow_file.messages.reserve(reserved_msg_capacity);
+      });
+
+      // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
 
 Review comment:
   It is backwards compatible, but not desired. That behavior is why the issue exists.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368678139
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,150 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    rd_kafka_headers_s* make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      rd_kafka_resp_err_t err{};
+
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        rd_kafka_headers_t * const hdrs_copy = rd_kafka_headers_copy(hdrs);
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+      } else {
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+      return err;
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex)
+    { }
 
     ~ReadCallback() {
       if (hdrs) {
         rd_kafka_headers_destroy(hdrs);
       }
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       rd_kafka_resp_err_t err;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
-      }
+      called_ = true;
 
       size_t segment_num = 0U;
+
+      assert(flow_size_ == 0 || max_seg_size_ != 0 && "at this point, max_seg_size_ is only zero if flow_size_ is zero");
 
 Review comment:
   fixed in c872f3d

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368581257
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
+      // workaround: call callback since ProcessSession doesn't do so for empty flow files
+      logger_->log_debug("ReadCallback workaround on empty flow file, because DropEmptyFlowFiles is false, uuid: %s", flowFile->getUUIDStr());
+      callback.process(nullptr);
+    }
+    if (!callback.called_ && dropEmptyFlowFiles) {
+      logger_->log_info("Deprecated behavior, use connections to drop empty flow files! Dropped empty flow file with uuid: %s", flowFile->getUUIDStr());
 
 Review comment:
   Fair point :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368678166
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
 
 Review comment:
   fixed in c872f3d

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368581719
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,150 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    rd_kafka_headers_s* make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      rd_kafka_resp_err_t err{};
+
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        rd_kafka_headers_t * const hdrs_copy = rd_kafka_headers_copy(hdrs);
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+      } else {
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+      return err;
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex)
+    { }
 
     ~ReadCallback() {
       if (hdrs) {
         rd_kafka_headers_destroy(hdrs);
       }
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       rd_kafka_resp_err_t err;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
-      }
+      called_ = true;
 
       size_t segment_num = 0U;
+
+      assert(flow_size_ == 0 || max_seg_size_ != 0 && "at this point, max_seg_size_ is only zero if flow_size_ is zero");
 
 Review comment:
   Same still applies: I hardly compile code in debug mode, so I don't think this assertion would even warn me even if I touch the processor code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369601734
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    utils::owner<rd_kafka_headers_s*> make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        const utils::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs);
+        const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                           RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+        return err;
+      } else {
+        return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index,
+                 const bool fail_empty_flow_files)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex),
+          fail_empty_flow_files_(fail_empty_flow_files)
+    { }
 
     ~ReadCallback() {
-      if (hdrs) {
-        rd_kafka_headers_destroy(hdrs);
-      }
+      rd_kafka_headers_destroy(hdrs);
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
-      rd_kafka_resp_err_t err;
+      called_ = true;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
+      assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
+      // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
+      const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
+      messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
+        flow_file.messages.reserve(reserved_msg_capacity);
+      });
+
+      // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
 
 Review comment:
   Ah, okay, thanks! 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-583369678
 
 
   > Will run tests.
   
   @bakaid Did you run the tests?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-584180260
 
 
   Both appveyor test failures are unrelated:
   * ProcessorTests: Fails because it tests that a file with a name starting with a dot is treated as a hidden file and ignored by GetFile
   * TailFileTests: Fails because test files are written with CRLF line ending but TailFile expects LF.
   Both are fixed in MINIFICPP-1096 (PR coming soon)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368678090
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,14 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::DropEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Drop empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka. The old behavior is deprecated. Use connections "
+                          "to drop empty flow files!")
+        ->isRequired(false)
+        ->withDefaultValue<bool>(true)
+        ->build()
+);
 
 Review comment:
   fixed in c872f3d

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368568039
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
+      // workaround: call callback since ProcessSession doesn't do so for empty flow files
+      logger_->log_debug("ReadCallback workaround on empty flow file, because DropEmptyFlowFiles is false, uuid: %s", flowFile->getUUIDStr());
+      callback.process(nullptr);
+    }
+    if (!callback.called_ && dropEmptyFlowFiles) {
+      logger_->log_info("Deprecated behavior, use connections to drop empty flow files! Dropped empty flow file with uuid: %s", flowFile->getUUIDStr());
 
 Review comment:
   I like this log message, but I would prefer moving this to connection to avoid duplicating to multiple processors. 
   Please also change the log level to debug (it's for developers, not operations) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r373452314
 
 

 ##########
 File path: libminifi/include/utils/GeneralUtils.h
 ##########
 @@ -0,0 +1,50 @@
+/**
+ * @file GeneralUtils.h
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_GENERAL_UTILS_H
+#define LIBMINIFI_INCLUDE_UTILS_GENERAL_UTILS_H
+
+#include <memory>
+#include <type_traits>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args) {
+  return std::unique_ptr<T>{ new T{ std::forward<Args>(args)... } };
+}
+
+template<typename T, typename std::enable_if<std::is_integral<T>::value>::type* = nullptr>
+T intdiv_ceil(T numerator, T denominator) {
+  // note: division and remainder is 1 instruction on x86
+  return numerator / denominator + (numerator % denominator > 0);
+}
+
+template <typename T, typename std::enable_if<std::is_pointer<T>::value>::type* = nullptr>
 
 Review comment:
   This construct is good for marking that the owner of a raw pointer has the responsibility of deallocating it, but I would argue, that aside from interfacing with C code, this is the only case a raw pointer should be used in C++ anyway.
   I think in the case of `rd_kafka_headers_s` creating a RAII wrapper for it would be a better solution.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r373498725
 
 

 ##########
 File path: libminifi/include/utils/GeneralUtils.h
 ##########
 @@ -0,0 +1,50 @@
+/**
+ * @file GeneralUtils.h
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_GENERAL_UTILS_H
+#define LIBMINIFI_INCLUDE_UTILS_GENERAL_UTILS_H
+
+#include <memory>
+#include <type_traits>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args) {
+  return std::unique_ptr<T>{ new T{ std::forward<Args>(args)... } };
+}
+
+template<typename T, typename std::enable_if<std::is_integral<T>::value>::type* = nullptr>
+T intdiv_ceil(T numerator, T denominator) {
+  // note: division and remainder is 1 instruction on x86
+  return numerator / denominator + (numerator % denominator > 0);
+}
+
+template <typename T, typename std::enable_if<std::is_pointer<T>::value>::type* = nullptr>
 
 Review comment:
   fixed
   `std::unique_ptr<rd_kafka_headers_t, decltype([](rd_kafka_headers_t* p) { rd_kafka_headers_destroy(p); })>` both acts as an RAII wrapper and does the null checking.
   
   p.s.: can't wait for C++20 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368584995
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
 
 Review comment:
   I think the callback is the right place for producing the kafka message, since this is normally done in response to a successful read operation, as the next step of the pipeline.
   I'm not sure that the workaround belongs to the callback, but since there's a lot of shared code, I think it's fine.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r376957040
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -194,120 +187,142 @@ class PublishKafka : public core::Processor {
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    struct rd_kafka_headers_deleter {
+      void operator()(rd_kafka_headers_t* ptr) const noexcept {
+        rd_kafka_headers_destroy(ptr);
+      }
+    };
+
+    using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
+
+   private:
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
+      const utils::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
+      if (!result) { throw std::bad_alloc{}; }
+
+      for (const auto& kv : flow_file.getAttributes()) {
 
 Review comment:
   Compilation fails with
   ```
   /Users/danielbakai/nifi-minifi-cpp/extensions/librdkafka/PublishKafka.h:212:29: error: 'this' argument to
         member function 'getAttributes' has type 'const core::FlowFile', but function is not marked const
         for (const auto& kv : flow_file.getAttributes()) {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368580497
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,150 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    rd_kafka_headers_s* make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      rd_kafka_resp_err_t err{};
+
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        rd_kafka_headers_t * const hdrs_copy = rd_kafka_headers_copy(hdrs);
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+      } else {
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+      return err;
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex)
+    { }
 
     ~ReadCallback() {
       if (hdrs) {
         rd_kafka_headers_destroy(hdrs);
       }
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       rd_kafka_resp_err_t err;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
-      }
+      called_ = true;
 
       size_t segment_num = 0U;
+
+      assert(flow_size_ == 0 || max_seg_size_ != 0 && "at this point, max_seg_size_ is only zero if flow_size_ is zero");
 
 Review comment:
   It's meant as a comment to the developer, not for condition checking. The logging line would be dead code if I changed to that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369602843
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,13 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::FailEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Fail empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is "
+                          "deprecated. Use connections to drop empty flow files!")
+        ->isRequired(false)
 
 Review comment:
   Fair point.
   Having a default value while the property is not required is always a bad smell for me. In this case it's justified. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368575021
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
 
 Review comment:
   How does it solve the original issue?
   In case session->read fails due to missing content claim, it swallows exception, so the execution continues but the callback is never called? 
   
   What's the motivation of moving the kafka related logic to the callback? 
   
   I also have a feeling that this solution won't work in the there IS a content claim, but the content is actually empty. 
   Because of this I don't think that moving the logic to the callback is a good idea. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm edited a comment on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm edited a comment on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-576289051
 
 
   What is your opinion on using `const` by default, i.e. always, unless there's a reason not to?
   
   Pros:
   * extra static analysis -> bugs caught earlier
   * intent stated more explicitly -> more readable
   
   Cons:
   * more typing
   * more clutter -> less readable
   * not consistent with the current state of the codebase
     * but can be viewed as gradual evolution if we're in favor of this style

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368581179
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,14 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::DropEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Drop empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka. The old behavior is deprecated. Use connections "
+                          "to drop empty flow files!")
+        ->isRequired(false)
+        ->withDefaultValue<bool>(true)
+        ->build()
+);
 
 Review comment:
   Yep

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r373458007
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    utils::owner<rd_kafka_headers_s*> make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        const utils::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs);
+        const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                           RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+        return err;
+      } else {
+        return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index,
+                 const bool fail_empty_flow_files)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex),
+          fail_empty_flow_files_(fail_empty_flow_files)
+    { }
 
     ~ReadCallback() {
-      if (hdrs) {
-        rd_kafka_headers_destroy(hdrs);
-      }
+      rd_kafka_headers_destroy(hdrs);
 
 Review comment:
   I am not a big fan of unnecessarily checking for nullptr in cases where it really does not make sense, like before `free`.
   In this case, however `rd_kafka_headers_destroy` will segfault if it gets a nullptr: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_header.c#L36, and generically, I like more defensiveness around third party APIs.
   Even if at the current state of the code we can be sure that hdrs won't be nullptr, a later refactor might broke that implicit assumption and cause an issue here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm edited a comment on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm edited a comment on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-584180260
 
 
   Both appveyor test failures are unrelated:
   * ProcessorTests: Fails because it tests that a file with a name starting with a dot is treated as a hidden file and ignored by GetFile
   * TailFileTests: Fails because test files are written with CRLF line ending but TailFile expects LF.
   
   Both are fixed in MINIFICPP-1096 (PR coming soon)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-576289051
 
 
   What is your opinion on using `const` by default?
   
   Pros:
   * extra static analysis -> bugs caught earlier
   * intent stated more explicitly -> more readable
   
   Cons:
   * more typing
   * more clutter -> less readable
   * not consistent with the current state of the codebase
     * but can be viewed as gradual evolution if we're in favor of this style

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369472796
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    utils::owner<rd_kafka_headers_s*> make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        const utils::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs);
+        const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                           RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+        return err;
+      } else {
+        return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index,
+                 const bool fail_empty_flow_files)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex),
+          fail_empty_flow_files_(fail_empty_flow_files)
+    { }
 
     ~ReadCallback() {
-      if (hdrs) {
-        rd_kafka_headers_destroy(hdrs);
-      }
+      rd_kafka_headers_destroy(hdrs);
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
-      rd_kafka_resp_err_t err;
+      called_ = true;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
+      assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
+      // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
+      const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
+      messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
+        flow_file.messages.reserve(reserved_msg_capacity);
+      });
+
+      // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
 
 Review comment:
   see these 3 lines for proof: https://github.com/apache/nifi-minifi-cpp/pull/712/files#diff-9917deef91cc1503aff82b472310b7eaL662

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368582441
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
 
 Review comment:
   Producing the kafka message was part of the callback even before my changes. The issue is that when there is no `ResourceClaim`, `ProcessSession` doesn't call the callback and therefore we have no way of sending the message.
   
   I have extended the callback to handle empty flow files and made sure it's called with the above lines. The callback will be called exactly when it was not called and the used opted in for the correct behavior by setting `DropEmptyFlowFiles` to false.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369472085
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,13 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::FailEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Fail empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is "
+                          "deprecated. Use connections to drop empty flow files!")
+        ->isRequired(false)
 
 Review comment:
   If we make it required, old flows missing the property will fail, so we will not be backwards compatible.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-584179674
 
 
   Merged as 455d4b35bb6bc70373372d66ceaf9d35a4d32a49
   Forgot to add close with commit, closing manually. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369605832
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -99,6 +99,13 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a
                                              "");
 core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
                                            "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
+const core::Property PublishKafka::FailEmptyFlowFiles(
+    core::PropertyBuilder::createProperty("Fail empty flow files")
+        ->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is "
+                          "deprecated. Use connections to drop empty flow files!")
+        ->isRequired(false)
 
 Review comment:
   Does a default value satisfy the requirement for the property? If so, then I can set `isRequired(true)` and it would still keep old flows working

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r373498725
 
 

 ##########
 File path: libminifi/include/utils/GeneralUtils.h
 ##########
 @@ -0,0 +1,50 @@
+/**
+ * @file GeneralUtils.h
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_GENERAL_UTILS_H
+#define LIBMINIFI_INCLUDE_UTILS_GENERAL_UTILS_H
+
+#include <memory>
+#include <type_traits>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args) {
+  return std::unique_ptr<T>{ new T{ std::forward<Args>(args)... } };
+}
+
+template<typename T, typename std::enable_if<std::is_integral<T>::value>::type* = nullptr>
+T intdiv_ceil(T numerator, T denominator) {
+  // note: division and remainder is 1 instruction on x86
+  return numerator / denominator + (numerator % denominator > 0);
+}
+
+template <typename T, typename std::enable_if<std::is_pointer<T>::value>::type* = nullptr>
 
 Review comment:
   `std::unique_ptr<rd_kafka_headers_t, decltype([](rd_kafka_headers_t* p) { rd_kafka_headers_destroy(p); })>` both acts as an RAII wrapper and does the null checking.
   
   p.s.: can't wait for C++20 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] arpadboda commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
arpadboda commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-577217138
 
 
   LGTM, thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#issuecomment-583375672
 
 
   @szaszm Unfortunately I was always preempted by something, but I am building your final version right now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368600603
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -639,6 +647,17 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile,
                                         attributeNameRegex, messages, flow_file_index);
     session->read(flowFile, &callback);
+
+    bool dropEmptyFlowFiles;
+    if (!callback.called_ && context->getProperty(DropEmptyFlowFiles.getName(), dropEmptyFlowFiles) && !dropEmptyFlowFiles) {
 
 Review comment:
   Indeed, the behavior is inconsistent. When DropEmptyFlowFiles is true and there is a content claim, we don't drop the empty flow file. Will fix this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368599433
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,150 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    rd_kafka_headers_s* make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+      rd_kafka_resp_err_t err{};
+
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        rd_kafka_headers_t * const hdrs_copy = rd_kafka_headers_copy(hdrs);
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+      } else {
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+      return err;
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex)
+    { }
 
     ~ReadCallback() {
       if (hdrs) {
         rd_kafka_headers_destroy(hdrs);
       }
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       rd_kafka_resp_err_t err;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
-      }
+      called_ = true;
 
       size_t segment_num = 0U;
+
+      assert(flow_size_ == 0 || max_seg_size_ != 0 && "at this point, max_seg_size_ is only zero if flow_size_ is zero");
 
 Review comment:
   It's an assertion, not a check. I don't agree with checking constant trues and logging impossible errors.
   The fact that it doesn't ever fire a warning doesn't mean that it's useless. It's just as useful as a comment, except that it's written in code and makes the code a bit slower in debug mode.
   
   This assertion was meant to help the reader understand why only checking for max_seg_size_ == 0 but not flow_size_ == 0 is ok and intentional on the next line.
   Would you be OK with replacing it with a comment or adding an additional comment to clarify the purpose?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services