You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/02/10 12:32:09 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka

szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r377034602
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -194,120 +187,142 @@ class PublishKafka : public core::Processor {
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    struct rd_kafka_headers_deleter {
+      void operator()(rd_kafka_headers_t* ptr) const noexcept {
+        rd_kafka_headers_destroy(ptr);
+      }
+    };
+
+    using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
+
+   private:
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
+      const utils::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
+      if (!result) { throw std::bad_alloc{}; }
+
+      for (const auto& kv : flow_file.getAttributes()) {
 
 Review comment:
   rebased on top of dfbe225, That should solve the issue

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services