You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "adamdebreceni (via GitHub)" <gi...@apache.org> on 2023/03/08 10:40:45 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1508: MINIFICPP-2040 - Avoid deserializing flow files just to be deleted

adamdebreceni commented on code in PR #1508:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1508#discussion_r1129244175


##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -43,50 +43,67 @@ void FlowFileRepository::flush() {
   auto batch = opendb->createWriteBatch();
   rocksdb::ReadOptions options;
 
-  std::vector<std::shared_ptr<FlowFile>> purgeList;
-
-  std::vector<rocksdb::Slice> keys;
-  std::list<std::string> keystrings;
-  std::vector<std::string> values;
+  std::list<ExpiredFlowFileInfo> flow_files;
 
   while (keys_to_delete.size_approx() > 0) {
-    std::string key;
-    if (keys_to_delete.try_dequeue(key)) {
-      keystrings.push_back(std::move(key));  // rocksdb::Slice doesn't copy the string, only grabs ptrs. Hacky, but have to ensure the required lifetime of the strings.
-      keys.push_back(keystrings.back());
+    ExpiredFlowFileInfo info;
+    if (keys_to_delete.try_dequeue(info)) {
+      flow_files.push_back(std::move(info));
     }
   }
-  auto multistatus = opendb->MultiGet(options, keys, &values);
 
-  for (size_t i = 0; i < keys.size() && i < values.size() && i < multistatus.size(); ++i) {
-    if (!multistatus[i].ok()) {
-      logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data());
-      keystrings.remove(keys[i].data());
-      continue;
+  {
+    // deserialize flow files with missing content claim
+    std::vector<rocksdb::Slice> keys;
+    std::vector<std::list<ExpiredFlowFileInfo>::iterator> key_positions;
+    for (auto it = flow_files.begin(); it != flow_files.end(); ++it) {
+      if (!it->content) {
+        keys.push_back(it->key);
+        key_positions.push_back(it);
+      }
     }
+    if (!keys.empty()) {
+      std::vector<std::string> values;
+      auto multistatus = opendb->MultiGet(options, keys, &values);
 
-    utils::Identifier containerId;
-    auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, containerId);
-    if (eventRead) {
-      purgeList.push_back(eventRead);
+      for (size_t i = 0; i < keys.size() && i < values.size() && i < multistatus.size(); ++i) {
+        if (!multistatus[i].ok()) {
+          logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data());
+          flow_files.erase(key_positions.at(i));
+          continue;
+        }
+
+        utils::Identifier containerId;
+        auto flow_file = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, containerId);
+        if (flow_file) {
+          gsl_Expects(flow_file->getUUIDStr() == key_positions.at(i)->key);
+          key_positions.at(i)->content = flow_file->getResourceClaim();
+        } else {
+          logger_->log_error("Could not deserialize flow file %s", key_positions.at(i)->key);
+        }
+      }
     }
-    logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
-    batch.Delete(keys[i]);
+  }
+
+  for (auto& ff : flow_files) {
+    batch.Delete(ff.key);
+    logger_->log_debug("Issuing batch delete, including %s, Content path %s", ff.key, ff.content ? ff.content->getContentFullPath() : "null");
   }
 
   auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
 
   if (!ExecuteWithRetry(operation)) {
-    for (const auto& key : keystrings) {
-      keys_to_delete.enqueue(key);  // Push back the values that we could get but couldn't delete
+    for (const auto& ff : flow_files) {
+      keys_to_delete.enqueue(ff);  // Push back the values that we could get but couldn't delete

Review Comment:
   done



##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -43,50 +43,67 @@ void FlowFileRepository::flush() {
   auto batch = opendb->createWriteBatch();
   rocksdb::ReadOptions options;
 
-  std::vector<std::shared_ptr<FlowFile>> purgeList;
-
-  std::vector<rocksdb::Slice> keys;
-  std::list<std::string> keystrings;
-  std::vector<std::string> values;
+  std::list<ExpiredFlowFileInfo> flow_files;
 
   while (keys_to_delete.size_approx() > 0) {

Review Comment:
   done



##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -43,50 +43,67 @@ void FlowFileRepository::flush() {
   auto batch = opendb->createWriteBatch();
   rocksdb::ReadOptions options;
 
-  std::vector<std::shared_ptr<FlowFile>> purgeList;
-
-  std::vector<rocksdb::Slice> keys;
-  std::list<std::string> keystrings;
-  std::vector<std::string> values;
+  std::list<ExpiredFlowFileInfo> flow_files;
 
   while (keys_to_delete.size_approx() > 0) {
-    std::string key;
-    if (keys_to_delete.try_dequeue(key)) {
-      keystrings.push_back(std::move(key));  // rocksdb::Slice doesn't copy the string, only grabs ptrs. Hacky, but have to ensure the required lifetime of the strings.
-      keys.push_back(keystrings.back());
+    ExpiredFlowFileInfo info;
+    if (keys_to_delete.try_dequeue(info)) {
+      flow_files.push_back(std::move(info));
     }
   }
-  auto multistatus = opendb->MultiGet(options, keys, &values);
 
-  for (size_t i = 0; i < keys.size() && i < values.size() && i < multistatus.size(); ++i) {
-    if (!multistatus[i].ok()) {
-      logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data());
-      keystrings.remove(keys[i].data());
-      continue;
+  {
+    // deserialize flow files with missing content claim
+    std::vector<rocksdb::Slice> keys;
+    std::vector<std::list<ExpiredFlowFileInfo>::iterator> key_positions;
+    for (auto it = flow_files.begin(); it != flow_files.end(); ++it) {
+      if (!it->content) {
+        keys.push_back(it->key);
+        key_positions.push_back(it);
+      }
     }
+    if (!keys.empty()) {
+      std::vector<std::string> values;
+      auto multistatus = opendb->MultiGet(options, keys, &values);
 
-    utils::Identifier containerId;
-    auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, containerId);
-    if (eventRead) {
-      purgeList.push_back(eventRead);
+      for (size_t i = 0; i < keys.size() && i < values.size() && i < multistatus.size(); ++i) {
+        if (!multistatus[i].ok()) {
+          logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data());
+          flow_files.erase(key_positions.at(i));
+          continue;
+        }
+
+        utils::Identifier containerId;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org