You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/06/12 09:17:15 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni opened a new pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

adamdebreceni opened a new pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452661701



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);

Review comment:
       would be great, but this destructor runs when a `FlowFile` is being destroyed and `releaseClaim` is virtual which is overridden in `FlowFileRecord` which holds a pointer to the `content_repo_`, i.e. here it is too late, as we cannot call the derived class's method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454836324



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       the whole point of this story is that once the FlowFile assumes ownership over a ResourceClaim it should clean up after itself, currently this cleanup is in the `FlowFileRecord::~FlowFileRecord` which is arguably not the best place but only a `FlowFileRecord` can notify the content repository




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452334340



##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {

Review comment:
       feel free to mark this and all of the above as resolved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452135018



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -478,15 +466,9 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
       throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
     }
   } catch (std::exception &exception) {
-    if (flow) {

Review comment:
       the behavior doesn't change (I mean I changed it, and now changed it back), that if the import fails the flowFile retains its previous contentClaim




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452725222



##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
     logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
   else
     logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
-  if (claim_) {
-    releaseClaim(claim_);
-  } else {
+
+  if (!claim_) {
     logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
 
+  claim_.set(*this, nullptr);
+
   // Disown stash claims
-  for (const auto &stashPair : stashedContent_) {
-    releaseClaim(stashPair.second);
+  for (auto &stashPair : stashedContent_) {
+    auto& stashClaim = stashPair.second;
+    stashClaim.set(*this, nullptr);
   }
 }
 
 void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
   // Decrease the flow file record owned count for the resource claim
-  claim_->decreaseFlowFileRecordOwnedCount();
-  std::string value;
-  logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount());
-  if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-    // we cannot rely on the stored variable here since we aren't guaranteed atomicity
-    if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
-      logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath());
-      content_repo_->remove(claim_);
+  claim->decreaseFlowFileRecordOwnedCount();
+  logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());
+  if (content_repo_) {
+    if (content_repo_->removeIfOrphaned(claim)) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#issuecomment-661739832


   What is the fixed bug and segfault in the last 2 commits?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448255811



##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {

Review comment:
       done
   
   (I don't remember what was I planning with the template)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452661701



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);

Review comment:
       would be great, but this destructor runs when a `FlowFile` is being destroyed and `releaseClaim` is virtual which is overridden in `FlowFileRecord` which holds the pointer to the `content_repo_`, i.e. here it is too late, as we cannot call the derived class's method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454833537



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
       during `commit` and `rollback` we check if the items in `_deletedFlowFiles` indeed stayed deleted, or a transfer or add marked them for "resurrection"

##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
       during `commit` and `rollback` we check if the items in `_deletedFlowFiles` stayed deleted, or a transfer or add marked them for "resurrection"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454871893



##########
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##########
@@ -148,22 +148,27 @@ void FlowFileRepository::prune_stored_flowfiles() {
     std::string key = it->key().ToString();
     if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
       logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-      auto search = connectionMap.find(eventRead->getConnectionUuid());
-      if (!corrupt_checkpoint && search != connectionMap.end()) {
+      bool found = false;
+      auto search = containers.find(eventRead->getConnectionUuid());
+      found = (search != containers.end());
+      if (!found) {
+        // for backward compatibility
+        search = connectionMap.find(eventRead->getConnectionUuid());
+        found = (search != connectionMap.end());
+      }

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454874069



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       I just now realized that `ResourceClaim` also holds a reference to a `StreamManager<ResourceClaim>` it is beyond me why the `ResourceClaim` doesn't clean up after itself
   
   maybe it could be, that the `content_repo_` in the `FlowFileRecord` is different from the one in the `ResourceClaim`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r455004287



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       before `1.0.0` we should migrate the cleanup logic to `ResourceClaim`, currently it is not feasible as all methods in the `StreamManager` expect a `shared_ptr<ResourceClaim>` and neither in the constructor nor in the destructor do we have the means to summon one 😞 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r456383996



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       I don't think that the `content_repo_` in the `FlowFileRecord` is different from the one in `ResourceClaim`. I agree with the plans to migrate the cleanup logic to `ResourceClaim`, finally giving us complete and encapsulated RAII semantics.
   
   We have the means to summon a `std::shared_ptr<ResourceClaim>` from `ResourceClaim::~ResourceClaim` by calling `shared_from_this()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448254564



##########
File path: libminifi/include/core/ContentRepository.h
##########
@@ -104,7 +104,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> {
     if (count != count_map_.end() && count->second > 0) {
       count_map_[str] = count->second - 1;
     } else {
-	count_map_.erase(str);
+	    count_map_.erase(str);

Review comment:
       done

##########
File path: extensions/libarchive/BinFiles.cpp
##########
@@ -70,6 +71,8 @@ void BinFiles::initialize() {
   relationships.insert(Original);
   relationships.insert(Failure);
   setSupportedRelationships(relationships);
+
+  out_going_connections_[Self.getName()].insert(shared_from_this());

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454874069



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       I just now realized that `ResourceClaim` also holds a reference to a `StreamManager<ResourceClaim>` it is beyond me why the `ResourceClaim` doesn't clean up after itself




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda edited a comment on pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
arpadboda edited a comment on pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#issuecomment-655668294


   @adebreceni 
   There is a CI failure, might be related:
   ```
   /home/travis/build/apache/nifi-minifi-cpp/extensions/sftp/tests/FetchSFTPTests.cpp:274: FAILED:
     {Unknown expression after the reported line}
   due to unexpected exception with message:
     File Operation: No Content Claim existed for read
   ```
   
   It can be an issue in that given test or in the code of that processor your changes just revealed. 
   Please take a look!
   
   Ps.: this testcase is pretty stable otherwise. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454833537



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
       during `commit` and `rollback` we check if the items in `_deletedFlowFiles` are indeed stayed deleted, or a transfer or something marked them for "resurrection"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448255306



##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {
+    uint8_t tmpBuffer[4096]{};
+    int total_read = 0;
+    do {
+      auto ret = input.read(tmpBuffer, std::min(len, sizeof(tmpBuffer)));
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      len -= ret;
+      total_read += ret;
+      auto prevSize = buffer_.size();
+      buffer_.resize(prevSize + ret);
+      std::move(tmpBuffer, tmpBuffer + ret, buffer_.data() + prevSize);
+    } while (len);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452668104



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }

Review comment:
       one place I know of is `ProcessSession::create` that violates the "FlowFiles should always have non-null claim" constraint, I was thinking of some special singleton claims like `InvalidResourceClaim` and `EmptyResourceClaim` or something else, but I think we should wait until after the `0.8.0` release to make the change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni edited a comment on pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni edited a comment on pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#issuecomment-661751938


   - the `Fix bug` is about how we have to decrement the claim's counter on behalf of the previously persisted instance in case we overwrote it in the repository
   - the `Fix segfault` is, well, fixing a segfault caused by [this](https://issues.apache.org/jira/browse/MINIFICPP-1300) i.e. the `FlowController` never dying, and thus not stopping the other threads


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452134297



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -478,15 +466,9 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
       throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
     }
   } catch (std::exception &exception) {
-    if (flow) {

Review comment:
       it previously checked if it had the claim that we just created, but an exception can only come from before the set happens so it doesn't make much sense to do so, IMO there should be two global `ResourceClaim`s, the `InvalidResourceClaim` and the `EmptyResourceClaim` so we could abandon the "empty content path signifies an empty resourceclaim" and have a non-null claim even for these "faulty" scenarios 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r445652913



##########
File path: extensions/libarchive/BinFiles.cpp
##########
@@ -153,7 +156,7 @@ void BinManager::gatherReadyBins() {
 void BinManager::removeOldestBin() {
   std::lock_guard < std::mutex > lock(mutex_);
   uint64_t olddate = ULLONG_MAX;
-  std::unique_ptr < std::deque<std::unique_ptr<Bin>>>*oldqueue;
+  std::unique_ptr < std::deque<std::unique_ptr<Bin>>>* oldqueue;

Review comment:
       not your issue, but wtf: pointer to a smart ptr to a collection of smart ptrs to Bin.

##########
File path: libminifi/include/core/ContentRepository.h
##########
@@ -104,7 +104,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> {
     if (count != count_map_.end() && count->second > 0) {
       count_map_[str] = count->second - 1;
     } else {
-	count_map_.erase(str);
+	    count_map_.erase(str);

Review comment:
       wrong indentation

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -30,9 +30,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);
+    }
+   private:
+    std::shared_ptr<ResourceClaim> claim_;
+  };
  public:
   FlowFile();
-  ~FlowFile();
+  virtual ~FlowFile();

Review comment:
       Add `override` instead for clarification.

##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {
+    uint8_t tmpBuffer[4096]{};
+    int total_read = 0;
+    do {
+      auto ret = input.read(tmpBuffer, std::min(len, sizeof(tmpBuffer)));
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      len -= ret;

Review comment:
       I think the code would be easier to understand if we didn't change the argument, but used a separate `read` or  `remaining` variable. The less moving parts, the better. The generated code would most likely be the same.

##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {
+    uint8_t tmpBuffer[4096]{};
+    int total_read = 0;
+    do {
+      auto ret = input.read(tmpBuffer, std::min(len, sizeof(tmpBuffer)));
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      len -= ret;
+      total_read += ret;
+      auto prevSize = buffer_.size();
+      buffer_.resize(prevSize + ret);
+      std::move(tmpBuffer, tmpBuffer + ret, buffer_.data() + prevSize);
+    } while (len);

Review comment:
       I'm not a fan of utilizing implicit integer -> bool conversion. Also, a pretest cycle would avoid calling `input.read` on `len == 0`.
   ```suggestion
       } while (len > 0);
   ```

##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {

Review comment:
       What is the type of `input` meant to be? I'd like to see some type or trait/concept checks.
   All I can see is that it has to have a `read` member function that takes a uint8_t array and a size.

##########
File path: extensions/libarchive/BinFiles.cpp
##########
@@ -70,6 +71,8 @@ void BinFiles::initialize() {
   relationships.insert(Original);
   relationships.insert(Failure);
   setSupportedRelationships(relationships);
+
+  out_going_connections_[Self.getName()].insert(shared_from_this());

Review comment:
       This is a shared_ptr cycle.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454913192



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }

Review comment:
       Could you add this as a comment to the code? I think this is helpful information to future readers.

##########
File path: libminifi/include/core/Repository.h
##########
@@ -228,6 +232,8 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
   Repository &operator=(const Repository &parent) = delete;
 
  protected:
+  std::map<std::string, std::shared_ptr<core::Connectable>> containers;

Review comment:
       Could you add this as a comment to the code? I think this is helpful information to future readers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#issuecomment-660132934


   Could you check the CI failures? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448254975



##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {
+    uint8_t tmpBuffer[4096]{};
+    int total_read = 0;
+    do {
+      auto ret = input.read(tmpBuffer, std::min(len, sizeof(tmpBuffer)));
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      len -= ret;

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454831606



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -211,15 +206,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
 
 void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
   flow->setDeleted(true);
-  if (flow->getResourceClaim() != nullptr) {
-    flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-    logger_->log_debug("Auto terminated %s %" PRIu64 " %s", flow->getResourceClaim()->getContentFullPath(), flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr());
-  } else {
-    logger_->log_debug("Flow does not contain content. no resource claim to decrement.");

Review comment:
       added comment where the delete actually happens in `ProcessSession::commit`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454833537



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
       during `commit` and `rollback` we check if the items in `_deletedFlowFiles` are indeed stayed deleted, or a transfer or add marked them for "resurrection"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452115628



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -478,15 +466,9 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
       throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
     }
   } catch (std::exception &exception) {
-    if (flow) {

Review comment:
       Where does it happen now? 
   I guess this was here for a reason. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r456470888



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       good point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448277514



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -30,9 +30,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);
+    }
+   private:
+    std::shared_ptr<ResourceClaim> claim_;
+  };
  public:
   FlowFile();
-  ~FlowFile();
+  virtual ~FlowFile();

Review comment:
       The supporting guideline, only referring to "virtual functions" without mentioning destructors:
   https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rh-override
   
   Another one in which the example suggests that destructor is not excluded from the above rule:
   https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c130-for-making-deep-copies-of-polymorphic-classes-prefer-a-virtual-clone-function-instead-of-copy-constructionassignment
   
   Based on these, my standpoint remains. A logical rationale could be that a destructor definition implicitly calls the base class destructors after its execution, so it's actually overriding the base class implementation with a new one that happens to call the base class implementation at the end.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454839615



##########
File path: libminifi/test/persistence-tests/PersistenceTests.cpp
##########
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "FlowFileRepository.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "../../extensions/libarchive/MergeContent.h"
+#include "../test/BufferReader.h"
+
+using Connection = minifi::Connection;
+using MergeContent = minifi::processors::MergeContent;
+
+struct TestFlow{
+  TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo)
+      : ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+
+    // setup MERGE processor
+    {
+      merge = std::make_shared<MergeContent>("MergeContent", mergeProcUUID());
+      merge->initialize();
+      merge->setAutoTerminatedRelationships({{"original", "d"}});
+
+      merge->setProperty(MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+      merge->setProperty(MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+      merge->setProperty(MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT);
+      merge->setProperty(MergeContent::MinEntries, "3");
+      merge->setProperty(MergeContent::Header, "_Header_");
+      merge->setProperty(MergeContent::Footer, "_Footer_");
+      merge->setProperty(MergeContent::Demarcator, "_Demarcator_");
+      merge->setProperty(MergeContent::MaxBinAge, "1 h");
+
+      std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(merge);
+      mergeContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+    }
+
+    // setup INPUT processor
+    {
+      inputProcessor = std::make_shared<core::Processor>("source", inputProcUUID());
+      std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(inputProcessor);
+      inputContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo,
+                                                            ff_repository, content_repo);
+    }
+
+    // setup Input Connection
+    {
+      input = std::make_shared<Connection>(ff_repository, content_repo, "Input", inputConnUUID());
+      input->setRelationship({"input", "d"});
+      input->setDestinationUUID(mergeProcUUID());
+      input->setSourceUUID(inputProcUUID());
+      inputProcessor->addConnection(input);
+    }
+
+    // setup Output Connection
+    {
+      output = std::make_shared<Connection>(ff_repository, content_repo, "Output", outputConnUUID());
+      output->setRelationship(MergeContent::Merge);
+      output->setSourceUUID(mergeProcUUID());
+    }
+
+    // setup ProcessGroup
+    {
+      root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+      root->addProcessor(merge);
+      root->addConnection(input);
+      root->addConnection(output);
+    }
+
+    // prepare Merge Processor for execution
+    merge->setScheduledState(core::ScheduledState::RUNNING);
+    merge->onSchedule(mergeContext.get(), new core::ProcessSessionFactory(mergeContext));
+  }
+  void write(const std::string& data) {
+    minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+    core::ProcessSession sessionGenFlowFile(inputContext);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(stream, flow);
+    sessionGenFlowFile.transfer(flow, {"input", "d"});
+    sessionGenFlowFile.commit();

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448289515



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -30,9 +30,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);
+    }
+   private:
+    std::shared_ptr<ResourceClaim> claim_;
+  };
  public:
   FlowFile();
-  ~FlowFile();
+  virtual ~FlowFile();

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r459389896



##########
File path: libminifi/include/core/Repository.h
##########
@@ -31,6 +31,7 @@
 #include <string>
 #include <thread>
 #include <vector>
+#include "core/Deprecated.h"

Review comment:
       This header is no longer needed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448208397



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -30,9 +30,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);
+    }
+   private:
+    std::shared_ptr<ResourceClaim> claim_;
+  };
  public:
   FlowFile();
-  ~FlowFile();
+  virtual ~FlowFile();

Review comment:
       since virtual destructors have different semantics from other virtual functions (they never actually override the base destructor but they are chained) they shouldn't be marked `override` imo




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r459426459



##########
File path: libminifi/include/core/Repository.h
##########
@@ -31,6 +31,7 @@
 #include <string>
 #include <thread>
 #include <vector>
+#include "core/Deprecated.h"

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452651022



##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -353,6 +353,10 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecta
     connectionMap[connection->getUUIDStr()] = connection;
     connectionMap[connection->getName()] = connection;
   }
+  for (auto processor : processors_) {
+    // processors can also own FlowFiles
+    connectionMap[processor->getUUIDStr()] = processor;

Review comment:
       agree, the main source of confusion is that `getConnections` expects a `std::map<std::string, std::shared_ptr<Connectable>>&` and `Processor` is `Connectable`
   
   notice that there is a `getConnections` right above this method which expects a `std::map<std::string, std::shared_ptr<Connection>>&`, there should be a separate interface indicating FlowFile ownership capabilities, and `Connection` should not be `Connectable` 
   
   for now I'll create a new method like `getFlowFileContainers` to better indicate what it's used for




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
arpadboda commented on pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#issuecomment-655668294


   @adebreceni 
   The is a CI failure, might be related:
   ```
   /home/travis/build/apache/nifi-minifi-cpp/extensions/sftp/tests/FetchSFTPTests.cpp:274: FAILED:
     {Unknown expression after the reported line}
   due to unexpected exception with message:
     File Operation: No Content Claim existed for read
   ```
   
   It can be an issue in that given test or in the code of that processor your changes just revealed. 
   Please take a look!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452651022



##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -353,6 +353,10 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecta
     connectionMap[connection->getUUIDStr()] = connection;
     connectionMap[connection->getName()] = connection;
   }
+  for (auto processor : processors_) {
+    // processors can also own FlowFiles
+    connectionMap[processor->getUUIDStr()] = processor;

Review comment:
       agree, the main source of confusion is that `getConnections` expects a `std::map<std::string, std::shared_ptr<Connectable>>&` and `Processor` is `Connectable`
   
   notice that there is a `getConnections` right above this method which expects a `std::map<std::string, std::shared_ptr<Connection>>&`, there should be a separate interface indicating FlowFile ownership capabilities, and `Connection` should not be `Connectable` (should do after the release)
   
   for now I'll create a new method like `getFlowFileContainers` to better indicate what it's used for




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r455004287



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       before `1.0.0` we should migrate the cleanup logic to `ResourceClaim`, currently it is not feasible as all methods in the `StreamManager` expect a `shared_ptr<ResourceClaim>` and neither in the constructor nor in the destructor do we have the means to summon one 😞 
   
   [MINIFICPP-1294](https://issues.apache.org/jira/browse/MINIFICPP-1294)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r456267761



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +35,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);

Review comment:
       added comment

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }

Review comment:
       yep, added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#issuecomment-661751938


   - the `Fix bug` is about how we have to decrement the claim's counter on behalf of the previously persisted instance in case we overwrote it in the repository
   - the `Fix segfault` is, well, fixing a segfault caused by the [this](https://issues.apache.org/jira/browse/MINIFICPP-1300) i.e. the `FlowController` never dying, and thus not stopping the other threads


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452660147



##########
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##########
@@ -156,14 +156,12 @@ void FlowFileRepository::prune_stored_flowfiles() {
         search->second->put(eventRead);
       } else {
         logger_->log_warn("Could not find connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-        if (eventRead->getContentFullPath().length() > 0) {
-          if (nullptr != eventRead->getResourceClaim()) {
-            content_repo_->remove(eventRead->getResourceClaim());
-          }
-        }
+        auto claim = eventRead->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();

Review comment:
       here we decrement on behalf of the persisted FlowFile we just read from the repo, then mark the FlowFile for deletion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454824460



##########
File path: libminifi/include/core/Repository.h
##########
@@ -228,6 +232,8 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
   Repository &operator=(const Repository &parent) = delete;
 
  protected:
+  std::map<std::string, std::shared_ptr<core::Connectable>> containers;

Review comment:
       currently `Connectable` declares the `put` method, meaning that `Connectable`s are exactly what can be containers, later I would like if we had a `FlowFileContainer` class and separate this capability out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454834083



##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
     logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
   else
     logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
-  if (claim_) {
-    releaseClaim(claim_);
-  } else {
+
+  if (!claim_) {
     logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
 
+  claim_.set(*this, nullptr);
+
   // Disown stash claims
-  for (const auto &stashPair : stashedContent_) {
-    releaseClaim(stashPair.second);
+  for (auto &stashPair : stashedContent_) {
+    auto& stashClaim = stashPair.second;
+    stashClaim.set(*this, nullptr);
   }
 }
 
 void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
   // Decrease the flow file record owned count for the resource claim
-  claim_->decreaseFlowFileRecordOwnedCount();
-  std::string value;
-  logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount());
-  if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-    // we cannot rely on the stored variable here since we aren't guaranteed atomicity
-    if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
-      logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath());
-      content_repo_->remove(claim_);
+  claim->decreaseFlowFileRecordOwnedCount();
+  logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454824460



##########
File path: libminifi/include/core/Repository.h
##########
@@ -228,6 +232,8 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
   Repository &operator=(const Repository &parent) = delete;
 
  protected:
+  std::map<std::string, std::shared_ptr<core::Connectable>> containers;

Review comment:
       currently `Connectable` declares the `put` method, meaning that `Connectable`s are exactly what can be containers, later I would like to create a `FlowFileContainer` class and separate this capability out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r451670403



##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -353,6 +353,10 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecta
     connectionMap[connection->getUUIDStr()] = connection;
     connectionMap[connection->getName()] = connection;
   }
+  for (auto processor : processors_) {
+    // processors can also own FlowFiles
+    connectionMap[processor->getUUIDStr()] = processor;

Review comment:
       I think it's confusing to place processors in things like `connectionMap` (i.e. treat them like connections) just because they can own flow files.

##########
File path: libminifi/src/Connection.cpp
##########
@@ -268,10 +229,12 @@ void Connection::drain(bool delete_permanently) {
   while (!queue_.empty()) {
     std::shared_ptr<core::FlowFile> item = queue_.front();
     queue_.pop();
-    logger_->log_debug("Delete flow file UUID %s from connection %s", item->getUUIDStr(), name_);
+    logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
     if (delete_permanently) {
-      if (flow_repository_->Delete(item->getUUIDStr())) {
+      if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
         item->setStoredToRepository(false);
+        auto claim = item->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();

Review comment:
       Shouldn't this call `item->releaseClaim(claim)` instead?

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }

Review comment:
       I suggest making `claim_` `gsl::not_null` unless there is a reason why it can not be done.

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);

Review comment:
       I think it would be cleaner to let this object store a not_null observer pointer to its owning flow file and let it call owner->releaseClaim whenever needed.

##########
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##########
@@ -156,14 +156,12 @@ void FlowFileRepository::prune_stored_flowfiles() {
         search->second->put(eventRead);
       } else {
         logger_->log_warn("Could not find connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-        if (eventRead->getContentFullPath().length() > 0) {
-          if (nullptr != eventRead->getResourceClaim()) {
-            content_repo_->remove(eventRead->getResourceClaim());
-          }
-        }
+        auto claim = eventRead->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();

Review comment:
       Isn't this going to leave content repo entries behind?

##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
     logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
   else
     logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
-  if (claim_) {
-    releaseClaim(claim_);
-  } else {
+
+  if (!claim_) {
     logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
 
+  claim_.set(*this, nullptr);
+
   // Disown stash claims
-  for (const auto &stashPair : stashedContent_) {
-    releaseClaim(stashPair.second);
+  for (auto &stashPair : stashedContent_) {
+    auto& stashClaim = stashPair.second;
+    stashClaim.set(*this, nullptr);
   }
 }
 
 void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
   // Decrease the flow file record owned count for the resource claim
-  claim_->decreaseFlowFileRecordOwnedCount();
-  std::string value;
-  logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount());
-  if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-    // we cannot rely on the stored variable here since we aren't guaranteed atomicity
-    if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
-      logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath());
-      content_repo_->remove(claim_);
+  claim->decreaseFlowFileRecordOwnedCount();
+  logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());
+  if (content_repo_) {
+    if (content_repo_->removeIfOrphaned(claim)) {

Review comment:
       Could use `operator&&`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r448255811



##########
File path: libminifi/test/BufferReader.h
##########
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+  template<class Input>
+  int write(Input input, std::size_t len) {

Review comment:
       done
   
   (I don't remember what I was planning with the template)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452725094



##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -353,6 +353,10 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecta
     connectionMap[connection->getUUIDStr()] = connection;
     connectionMap[connection->getName()] = connection;
   }
+  for (auto processor : processors_) {
+    // processors can also own FlowFiles
+    connectionMap[processor->getUUIDStr()] = processor;

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454842291



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +35,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);

Review comment:
       with refcount manipulation we always increment first, then decrement as this way we don't accidentally discard the object under ourselves, note that an equality check will not suffice as two `ResourceClaim` instances can reference the same file (they may have the same contentPath)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r456401011



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       unfortunately we do not, because it is most probably destroyed through a `shared_ptr` which first decrements the counter, then calls the destructor and with `shared_ptr` we cannot increment if the counter reached 0 (as it just did because that triggers the destructor) and we get a nice `bad_weak_ptr`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r455686258



##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +35,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);

Review comment:
       I realize that my usage of the word "comment" was ambigous here. I mean, could you please extend the code comment on line 63 to include this information? This is something that would be useful to understand e.g. in a future refactoring.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454833537



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
       during `commit` and `rollback` we check if the items in `_deletedFlowFiles` are indeed marked deleted, or a transfer or something marked them for "resurrection"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r453579663



##########
File path: libminifi/include/core/Repository.h
##########
@@ -228,6 +232,8 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
   Repository &operator=(const Repository &parent) = delete;
 
  protected:
+  std::map<std::string, std::shared_ptr<core::Connectable>> containers;

Review comment:
       This could use a comment, "containers" is not clear IMO. What can be containers, and what not? Does it include flow files, connections, processors, S2S client, etc.? All of these are derived from `Connectable`.

##########
File path: libminifi/test/persistence-tests/PersistenceTests.cpp
##########
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "FlowFileRepository.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "../../extensions/libarchive/MergeContent.h"
+#include "../test/BufferReader.h"
+
+using Connection = minifi::Connection;
+using MergeContent = minifi::processors::MergeContent;
+
+struct TestFlow{
+  TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo)
+      : ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+
+    // setup MERGE processor
+    {
+      merge = std::make_shared<MergeContent>("MergeContent", mergeProcUUID());
+      merge->initialize();
+      merge->setAutoTerminatedRelationships({{"original", "d"}});
+
+      merge->setProperty(MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+      merge->setProperty(MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+      merge->setProperty(MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT);
+      merge->setProperty(MergeContent::MinEntries, "3");
+      merge->setProperty(MergeContent::Header, "_Header_");
+      merge->setProperty(MergeContent::Footer, "_Footer_");
+      merge->setProperty(MergeContent::Demarcator, "_Demarcator_");
+      merge->setProperty(MergeContent::MaxBinAge, "1 h");
+
+      std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(merge);
+      mergeContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+    }
+
+    // setup INPUT processor
+    {
+      inputProcessor = std::make_shared<core::Processor>("source", inputProcUUID());
+      std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(inputProcessor);
+      inputContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo,
+                                                            ff_repository, content_repo);
+    }
+
+    // setup Input Connection
+    {
+      input = std::make_shared<Connection>(ff_repository, content_repo, "Input", inputConnUUID());
+      input->setRelationship({"input", "d"});
+      input->setDestinationUUID(mergeProcUUID());
+      input->setSourceUUID(inputProcUUID());
+      inputProcessor->addConnection(input);
+    }
+
+    // setup Output Connection
+    {
+      output = std::make_shared<Connection>(ff_repository, content_repo, "Output", outputConnUUID());
+      output->setRelationship(MergeContent::Merge);
+      output->setSourceUUID(mergeProcUUID());
+    }
+
+    // setup ProcessGroup
+    {
+      root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+      root->addProcessor(merge);
+      root->addConnection(input);
+      root->addConnection(output);
+    }
+
+    // prepare Merge Processor for execution
+    merge->setScheduledState(core::ScheduledState::RUNNING);
+    merge->onSchedule(mergeContext.get(), new core::ProcessSessionFactory(mergeContext));
+  }
+  void write(const std::string& data) {
+    minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+    core::ProcessSession sessionGenFlowFile(inputContext);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(stream, flow);
+    sessionGenFlowFile.transfer(flow, {"input", "d"});
+    sessionGenFlowFile.commit();

Review comment:
       I think assertions on the `ResourceClaim` refcount would be useful documentation here.

##########
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##########
@@ -148,22 +148,27 @@ void FlowFileRepository::prune_stored_flowfiles() {
     std::string key = it->key().ToString();
     if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
       logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-      auto search = connectionMap.find(eventRead->getConnectionUuid());
-      if (!corrupt_checkpoint && search != connectionMap.end()) {
+      bool found = false;
+      auto search = containers.find(eventRead->getConnectionUuid());
+      found = (search != containers.end());
+      if (!found) {
+        // for backward compatibility
+        search = connectionMap.find(eventRead->getConnectionUuid());
+        found = (search != connectionMap.end());
+      }

Review comment:
       We should document if `connectionMap` is now deprecated, for example near its declaration.

##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
     logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
   else
     logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
-  if (claim_) {
-    releaseClaim(claim_);
-  } else {
+
+  if (!claim_) {
     logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
 
+  claim_.set(*this, nullptr);
+
   // Disown stash claims
-  for (const auto &stashPair : stashedContent_) {
-    releaseClaim(stashPair.second);
+  for (auto &stashPair : stashedContent_) {
+    auto& stashClaim = stashPair.second;
+    stashClaim.set(*this, nullptr);
   }
 }
 
 void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
   // Decrease the flow file record owned count for the resource claim
-  claim_->decreaseFlowFileRecordOwnedCount();
-  std::string value;
-  logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount());
-  if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-    // we cannot rely on the stored variable here since we aren't guaranteed atomicity
-    if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
-      logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath());
-      content_repo_->remove(claim_);
+  claim->decreaseFlowFileRecordOwnedCount();
+  logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());

Review comment:
       Would you mind changing `"%llu"` to `"%" PRIu64`? `claim->getFlowFileRecordOwnedCount()` returns `uint64_t`.

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +35,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);

Review comment:
       Why is it important? Could you add the explanation to the comment?

##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
       Shouldn't we also remove the flow file from `_deletedFlowFiles` if present? Same with `add`, but I think it's less likely that we want to add a deleted flow file again. 

##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -211,15 +206,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
 
 void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
   flow->setDeleted(true);
-  if (flow->getResourceClaim() != nullptr) {
-    flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-    logger_->log_debug("Auto terminated %s %" PRIu64 " %s", flow->getResourceClaim()->getContentFullPath(), flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr());
-  } else {
-    logger_->log_debug("Flow does not contain content. no resource claim to decrement.");

Review comment:
       I suggest keeping the debug logs.

##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       I think we still need to clean up the resource claim (`clearResourceClaim()`) of the flow file before proceeding, because the flow file destructor will not do so. Is this done somewhere else?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r452649273



##########
File path: libminifi/src/Connection.cpp
##########
@@ -268,10 +229,12 @@ void Connection::drain(bool delete_permanently) {
   while (!queue_.empty()) {
     std::shared_ptr<core::FlowFile> item = queue_.front();
     queue_.pop();
-    logger_->log_debug("Delete flow file UUID %s from connection %s", item->getUUIDStr(), name_);
+    logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
     if (delete_permanently) {
-      if (flow_repository_->Delete(item->getUUIDStr())) {
+      if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
         item->setStoredToRepository(false);
+        auto claim = item->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();

Review comment:
       the item still owns the `claim_`, and in case somebody holds a reference to it, they should be able to access the content, here we decrement the counter on behalf of the persisted FlowFile instance we just marked for deletion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org