You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/09/02 09:12:01 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #887: MINIFICPP-1340 - Batch persist content changes

fgerlits commented on a change in pull request #887:
URL: https://github.com/apache/nifi-minifi-cpp/pull/887#discussion_r481866246



##########
File path: extensions/rocksdb-repos/DatabaseContentRepository.cpp
##########
@@ -64,13 +65,53 @@ void DatabaseContentRepository::stop() {
   db_.reset();
 }
 
+DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : ContentSession(std::move(repository)) {}
+
+std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
+  return std::make_shared<Session>(shared_from_this());
+}
+
+void DatabaseContentRepository::Session::commit() {
+  auto dbContentRepository = std::static_pointer_cast<DatabaseContentRepository>(repository_);
+  auto opendb = dbContentRepository->db_->open();
+  if (!opendb) {
+    throw Exception(GENERAL_EXCEPTION, "Couldn't open rocksdb database to commit content changes");
+  }
+  rocksdb::WriteBatch batch;
+  for (const auto& resource : managedResources_) {
+    auto outStream = dbContentRepository->write(resource.first, false, &batch);
+    if (outStream == nullptr) {
+      throw Exception(GENERAL_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
+    }
+    const auto size = resource.second->getSize();
+    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+      throw Exception(GENERAL_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
+    }
+  }
+  for (const auto& resource : extendedResources_) {
+    auto outStream = dbContentRepository->write(resource.first, true, &batch);
+    if (outStream == nullptr) {
+      throw Exception(GENERAL_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
+    }
+    const auto size = resource.second->getSize();
+    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+      throw Exception(GENERAL_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath());
+    }
+  }
+
+  rocksdb::WriteOptions options;
+  options.sync = true;
+  rocksdb::Status status = opendb->Write(options, &batch);
+  if (!status.ok()) {
+    throw std::runtime_error("Batch write failed: " + status.ToString());

Review comment:
       why is this not a `minifi::Exception`?

##########
File path: libminifi/include/utils/GeneralUtils.h
##########
@@ -68,6 +68,21 @@ using void_t = void;
 using std::void_t;
 #endif /* < C++17 */
 
+namespace internal {
+
+struct safe_enable_shared_from_this_base : std::enable_shared_from_this<safe_enable_shared_from_this_base> {

Review comment:
       What is the purpose of this hidden base class?  Asking for a friend.

##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -330,10 +306,8 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 2, 5, 4, 1, 3}) {

Review comment:
       is this a particular order, or just random?

##########
File path: extensions/rocksdb-repos/DatabaseContentRepository.cpp
##########
@@ -64,13 +65,53 @@ void DatabaseContentRepository::stop() {
   db_.reset();
 }
 
+DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : ContentSession(std::move(repository)) {}
+
+std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
+  return std::make_shared<Session>(shared_from_this());
+}
+
+void DatabaseContentRepository::Session::commit() {
+  auto dbContentRepository = std::static_pointer_cast<DatabaseContentRepository>(repository_);
+  auto opendb = dbContentRepository->db_->open();
+  if (!opendb) {
+    throw Exception(GENERAL_EXCEPTION, "Couldn't open rocksdb database to commit content changes");

Review comment:
       it might be worth creating a new Exception type for this, eg. DATABASE_EXCEPTION

##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -408,22 +376,14 @@ TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
     }
   }
 
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 2, 5, 1, 3}) {
     if (i == 4)

Review comment:
       remove this, and update the comment above to something like `// Generate 5 flow files [...]`?

##########
File path: libminifi/include/core/repository/VolatileContentRepository.h
##########
@@ -41,7 +43,11 @@ namespace repository {
  * Purpose: Stages content into a volatile area of memory. Note that   when the maximum number
  * of entries is consumed we will rollback a session to wait for others to be freed.
  */
-class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> {
+class VolatileContentRepository :
+    public core::ContentRepository,
+    public core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>,

Review comment:
       why is `virtual` no longer needed?

##########
File path: libminifi/include/core/ContentSession.h
##########
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include "ResourceClaim.h"
+#include "io/BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class ContentRepository;
+
+class ContentSession {
+ public:
+  explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+
+  std::shared_ptr<ResourceClaim> create();
+
+  std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, bool append = false);

Review comment:
       I think an `{OVERWRITE, APPEND}` enum would be better than a `bool`

##########
File path: libminifi/include/utils/GeneralUtils.h
##########
@@ -68,6 +68,21 @@ using void_t = void;
 using std::void_t;
 #endif /* < C++17 */
 
+namespace internal {
+
+struct safe_enable_shared_from_this_base : std::enable_shared_from_this<safe_enable_shared_from_this_base> {
+  virtual ~safe_enable_shared_from_this_base() = default;
+};
+
+}  // namespace internal
+
+template<typename T>
+struct safe_enable_shared_from_this : virtual internal::safe_enable_shared_from_this_base {
+  std::shared_ptr<T> shared_from_this() {

Review comment:
       To me, the old name `shared_from_parent` was more expressive, as well as making it clear that this is a custom function, not the one from the standard library.  Maybe call them `enable_shared_from_parent<>` and `shared_from_parent()`?

##########
File path: libminifi/src/io/DataStream.cpp
##########
@@ -33,7 +33,9 @@ namespace io {
 int DataStream::writeData(uint8_t *value, int size) {
   if (value == nullptr)
     return 0;
-  std::copy(value, value + size, std::back_inserter(buffer));
+  std::size_t previous_size = buffer.size();
+  buffer.resize(previous_size + size);
+  std::memcpy(buffer.data() + previous_size, value, size);

Review comment:
       :+1:
   When I was looking into the performance of CWEL, this function was a major bottleneck.  This should make it faster.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org