You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:27:38 UTC

[GitHub] [nifi-minifi-cpp] adam-markovics opened a new pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

adam-markovics opened a new pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r746475581



##########
File path: libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
##########
@@ -66,8 +76,11 @@ class AbstractCoreComponentStateManagerProvider : public std::enable_shared_from
   virtual bool removeImpl(const utils::Identifier& key) = 0;
   virtual bool persistImpl() = 0;
 
-  virtual std::string serialize(const core::CoreComponentState& kvs);
-  bool deserialize(const std::string& serialized, core::CoreComponentState& kvs);
+ private:
+  void removeFromCache(utils::Identifier id);
+
+  std::mutex mutex;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r746369902



##########
File path: extensions/windows-event-log/tests/BookmarkTests.cpp
##########
@@ -41,7 +41,8 @@ std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
                                          const utils::Identifier &uuid = IdGenerator::getIdGenerator()->generate()) {
   const auto state_manager = test_plan.getStateManagerProvider()->getCoreComponentStateManager(uuid);
   const auto logger = test_plan.getLogger();
-  return std::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  auto bookmark = std::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  return bookmark;

Review comment:
       What was the reason for this change?

##########
File path: extensions/coap/tests/CoapIntegrationBase.h
##########
@@ -37,7 +37,7 @@ int ssl_enable(void *, void *) {
 
 class CoapIntegrationBase : public IntegrationBase {
  public:
-  explicit CoapIntegrationBase(uint64_t waitTime = 5000)
+  explicit CoapIntegrationBase(std::chrono::milliseconds waitTime = std::chrono::milliseconds(5000))

Review comment:
       You could use chrono literals instead. The same comment should apply to all the std::chrono usages in this PR.

##########
File path: libminifi/include/core/ProcessContext.h
##########
@@ -362,6 +369,8 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
   controller::ControllerServiceProvider* controller_service_provider_;
   // state manager provider
   std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
+  // state manager
+  std::shared_ptr<CoreComponentStateManager> stateManager_;

Review comment:
       Use snake_case_ here as well (valid for any further member variables in the PR), also I think these member comments are unnecessary as they do not contain any useful info.

##########
File path: libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
##########
@@ -34,29 +34,39 @@ namespace controllers {
 class AbstractCoreComponentStateManagerProvider : public std::enable_shared_from_this<AbstractCoreComponentStateManagerProvider>,
                                                    public core::CoreComponentStateManagerProvider {
  public:
-  ~AbstractCoreComponentStateManagerProvider() override;
-
   std::shared_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) override;
-
   std::map<utils::Identifier, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
 
   class AbstractCoreComponentStateManager : public core::CoreComponentStateManager{
    public:
     AbstractCoreComponentStateManager(std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider, const utils::Identifier& id);
 
-    bool set(const core::CoreComponentState& kvs) override;
+    ~AbstractCoreComponentStateManager() override;
 
+    bool set(const core::CoreComponentState& kvs) override;
     bool get(core::CoreComponentState& kvs) override;
-
     bool clear() override;
-
     bool persist() override;
 
+    bool isTransactionInProgress() const override;
+    bool beginTransaction() override;
+    bool commit() override;
+    bool rollback() override;
+
    private:
+    enum class ChangeType {
+      NONE,
+      SET,
+      CLEAR
+    };
+
     std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider_;
     utils::Identifier id_;
     bool state_valid_;
     core::CoreComponentState state_;
+    bool transactionInProgress_;
+    ChangeType changeType_;
+    core::CoreComponentState stateToSet_;

Review comment:
       Use snake_case_ format for member variables

##########
File path: libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
##########
@@ -66,8 +76,11 @@ class AbstractCoreComponentStateManagerProvider : public std::enable_shared_from
   virtual bool removeImpl(const utils::Identifier& key) = 0;
   virtual bool persistImpl() = 0;
 
-  virtual std::string serialize(const core::CoreComponentState& kvs);
-  bool deserialize(const std::string& serialized, core::CoreComponentState& kvs);
+ private:
+  void removeFromCache(utils::Identifier id);
+
+  std::mutex mutex;

Review comment:
       Use "_" suffix for member




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r677390804



##########
File path: extensions/windows-event-log/tests/BookmarkTests.cpp
##########
@@ -40,7 +40,10 @@ std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
                                          const utils::Identifier &uuid = IdGenerator::getIdGenerator()->generate()) {
   const auto state_manager = test_plan.getStateManagerProvider()->getCoreComponentStateManager(uuid);
   const auto logger = test_plan.getLogger();
-  return utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->beginTransaction();
+  auto bookmark = utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->commit();

Review comment:
       A conclusion was made that state changes would be autocommit (SQL-like) when outside of a transaction (such as in onTrigger). A transaction would begin and at the end of the call it would finish with commit. If users want to, they can still begin and commit/rollback a transaction manually if they wish so.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r684137268



##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       The same whole archive linking is done for unit tests of standard processors, nanofi tests and many other tests.
   The main minifiexe binary is also linking it through linking all the extensions (including standard processors) in main/CMakeLists.txt:72:
   `target_wholearchive_library(minifiexe ${EXTENSION})`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r677141249



##########
File path: extensions/windows-event-log/tests/BookmarkTests.cpp
##########
@@ -40,7 +40,10 @@ std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
                                          const utils::Identifier &uuid = IdGenerator::getIdGenerator()->generate()) {
   const auto state_manager = test_plan.getStateManagerProvider()->getCoreComponentStateManager(uuid);
   const auto logger = test_plan.getLogger();
-  return utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->beginTransaction();
+  auto bookmark = utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->commit();

Review comment:
       If you want to make a change to the state (set or clear) without being in a transaction, your call will fail, returning false.
   I don't think it is possible to auto-commit outside of onTrigger. It is the user's responsibility to decide when/how often/if to commit or rollback. I cannot think of a way that is universally applicable everywhere.
   Transaction objects wouldn't solve this problem, and they would break preexisting interface of state manager, affecting all users, even in onTriggers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r725025330



##########
File path: libminifi/test/StatefulProcessor.h
##########
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+#include "core/Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class StatefulProcessor : public core::Processor {
+ public:
+  using core::Processor::Processor;
+
+  using HookType = std::function<void(core::CoreComponentStateManager&)>;
+  using HookListType = std::vector<HookType>;
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&) override;
+
+  void setHooks(HookType onScheduleHook, HookListType onTriggerHooks);
+
+  bool hasFinishedHooks() const;

Review comment:
       Good suggestion, now possible thanks to the new standard.

##########
File path: libminifi/test/integration/StateTransactionalityTests.cpp
##########
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include <iostream>
+#include "IntegrationBase.h"
+#include "../StatefulProcessor.h"
+#include "../TestBase.h"
+#include "utils/IntegrationTestUtils.h"
+#include "core/state/ProcessorController.h"
+
+using org::apache::nifi::minifi::processors::StatefulProcessor;
+using org::apache::nifi::minifi::state::ProcessorController;
+
+namespace {
+using LogChecker = std::function<bool()>;
+
+struct HookCollection {
+  StatefulProcessor::HookType onScheduleHook_;
+  StatefulProcessor::HookListType onTriggerHooks_;
+  LogChecker logChecker_;
+};
+
+class StatefulIntegrationTest : public IntegrationBase {
+ public:
+  explicit StatefulIntegrationTest(std::string testCase, HookCollection hookCollection)
+    : onScheduleHook_(std::move(hookCollection.onScheduleHook_))
+    , onTriggerHooks_(std::move(hookCollection.onTriggerHooks_))
+    , logChecker_(hookCollection.logChecker_)
+    , testCase_(std::move(testCase)) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().reset();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setDebug<core::ProcessSession>();
+    LogTestController::getInstance().setDebug<StatefulIntegrationTest>();
+    logger_->log_info("Running test case \"%s\"", testCase_);
+  }
+
+  void updateProperties(std::shared_ptr<minifi::FlowController> fc) override {
+    const auto controllerVec = fc->getAllComponents();
+    /* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor
+     * (See TestStateTransactionality.yml)
+     * In this case there are two components in the flowcontroller: first is the controller itself,
+     * second is the processor that the test uses.
+     * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
+     * that most probably means a breaking change. */
+    assert(controllerVec.size() == 2);
+    assert(controllerVec[0]->getComponentName() == "FlowController");
+    assert(controllerVec[1]->getComponentName() == "statefulProcessor");
+
+    // set hooks
+    const auto processController = std::dynamic_pointer_cast<ProcessorController>(controllerVec[1]);
+    assert(processController != nullptr);
+    statefulProcessor_ = std::dynamic_pointer_cast<StatefulProcessor>(processController->getProcessor());
+    assert(statefulProcessor_ != nullptr);
+    statefulProcessor_->setHooks(onScheduleHook_, onTriggerHooks_);
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+      return statefulProcessor_->hasFinishedHooks() && logChecker_();
+    }));
+  }
+
+ private:
+  const StatefulProcessor::HookType onScheduleHook_;
+  const StatefulProcessor::HookListType onTriggerHooks_;
+  const LogChecker logChecker_;
+  const std::string testCase_;
+  std::shared_ptr<StatefulProcessor> statefulProcessor_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<StatefulIntegrationTest>::getLogger()};
+};
+
+const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1"}, {"key2", "value2"}};
+const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}};
+
+auto standardLogChecker = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto warningResult = utils::StringUtils::countOccurrences(logs, "[warning]");
+  return errorResult.second == 0 && warningResult.second == 0;
+};
+
+auto commitAndRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Process Session Operation: State manager commit failed.\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs,
+    "[warning] Caught Exception during process session rollback: Process Session Operation: State manager rollback failed.");
+  return errorResult.second == 0 && commitWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+auto exceptionRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto exceptionWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering rollback\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed");
+  return errorResult.second == 0 && exceptionWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
+  {"State_is_recorded_after_committing", {
+    {},
+    {
+     [] (core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [] (core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  standardLogChecker
+  }},
+  {"State_is_discarded_after_rolling_back", {
+     {},
+     {
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState2));
+       throw std::runtime_error("Triggering rollback");
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  exceptionRollbackWarnings
+  }},
+  {
+    "Get_in_onSchedule_without_previous_state", {
+    [](core::CoreComponentStateManager& stateManager) {
+      std::unordered_map<std::string, std::string> state;
+      assert(!stateManager.get(state));
+      assert(state.empty());
+    },
+    {},
+    standardLogChecker
+    }
+  },
+  {
+    "Set_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(stateManager.set(exampleState));
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+    },
+    standardLogChecker
+    }
+  },
+  {
+    "Clear_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(!stateManager.clear());
+      assert(stateManager.set(exampleState));
+      assert(stateManager.clear());
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(!stateManager.get(state));
+      assert(state.empty());
+     }
+    },
+    standardLogChecker
+    },
+  },
+  {
+    "Persist_in_onSchedule", {
+    {
+      [](core::CoreComponentStateManager& stateManager) {
+        assert(stateManager.persist());
+      }
+      },

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r676610424



##########
File path: extensions/windows-event-log/tests/BookmarkTests.cpp
##########
@@ -40,7 +40,10 @@ std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
                                          const utils::Identifier &uuid = IdGenerator::getIdGenerator()->generate()) {
   const auto state_manager = test_plan.getStateManagerProvider()->getCoreComponentStateManager(uuid);
   const auto logger = test_plan.getLogger();
-  return utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->beginTransaction();
+  auto bookmark = utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->commit();

Review comment:
       Could we somehow make this easier to use correctly and hard/impossible to use incorrectly? I'm thinking about transaction objects having state mutating capabilities, and the state manager only handing them out. onTrigger could get one as an argument.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r746495897



##########
File path: libminifi/include/core/ProcessContext.h
##########
@@ -362,6 +369,8 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
   controller::ControllerServiceProvider* controller_service_provider_;
   // state manager provider
   std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
+  // state manager
+  std::shared_ptr<CoreComponentStateManager> stateManager_;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r672437833



##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -220,7 +219,9 @@ void QueryDatabaseTable::initializeMaxValues(core::ProcessContext &context) {
     logger_->log_info("Found no stored state");
   } else {
     if (!loadMaxValuesFromStoredState(stored_state)) {
+      state_manager_->beginTransaction();
       state_manager_->clear();
+      state_manager_->commit();

Review comment:
       What are the new semantics that make these extra calls necessary?

##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       What changed that makes this link necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r724963981



##########
File path: libminifi/test/StatefulProcessor.h
##########
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+#include "core/Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class StatefulProcessor : public core::Processor {
+ public:
+  using core::Processor::Processor;
+
+  using HookType = std::function<void(core::CoreComponentStateManager&)>;
+  using HookListType = std::vector<HookType>;
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&) override;
+
+  void setHooks(HookType onScheduleHook, HookListType onTriggerHooks);
+
+  bool hasFinishedHooks() const;

Review comment:
       Consider `[[nodiscard]]`.

##########
File path: libminifi/test/StatefulProcessor.h
##########
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+#include "core/Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class StatefulProcessor : public core::Processor {
+ public:
+  using core::Processor::Processor;
+
+  using HookType = std::function<void(core::CoreComponentStateManager&)>;
+  using HookListType = std::vector<HookType>;

Review comment:
       I think this type alias (`HookListType`) hurts more than it helps readability. The upside is helping readability by not having to read long complex types when they don't matter and providing more specific description of the use case. The downside is not having the type information readily available when it matters and the usage is obvious.
   
   I'm neutral on `HookType`: I wouldn't have introduced it myself, but I don't mind it.

##########
File path: libminifi/test/integration/StateTransactionalityTests.cpp
##########
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include <iostream>
+#include "IntegrationBase.h"
+#include "../StatefulProcessor.h"
+#include "../TestBase.h"
+#include "utils/IntegrationTestUtils.h"
+#include "core/state/ProcessorController.h"
+
+using org::apache::nifi::minifi::processors::StatefulProcessor;
+using org::apache::nifi::minifi::state::ProcessorController;
+
+namespace {
+using LogChecker = std::function<bool()>;
+
+struct HookCollection {
+  StatefulProcessor::HookType onScheduleHook_;
+  StatefulProcessor::HookListType onTriggerHooks_;
+  LogChecker logChecker_;
+};
+
+class StatefulIntegrationTest : public IntegrationBase {
+ public:
+  explicit StatefulIntegrationTest(std::string testCase, HookCollection hookCollection)
+    : onScheduleHook_(std::move(hookCollection.onScheduleHook_))
+    , onTriggerHooks_(std::move(hookCollection.onTriggerHooks_))
+    , logChecker_(hookCollection.logChecker_)
+    , testCase_(std::move(testCase)) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().reset();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setDebug<core::ProcessSession>();
+    LogTestController::getInstance().setDebug<StatefulIntegrationTest>();
+    logger_->log_info("Running test case \"%s\"", testCase_);
+  }
+
+  void updateProperties(std::shared_ptr<minifi::FlowController> fc) override {
+    const auto controllerVec = fc->getAllComponents();
+    /* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor
+     * (See TestStateTransactionality.yml)
+     * In this case there are two components in the flowcontroller: first is the controller itself,
+     * second is the processor that the test uses.
+     * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
+     * that most probably means a breaking change. */
+    assert(controllerVec.size() == 2);
+    assert(controllerVec[0]->getComponentName() == "FlowController");
+    assert(controllerVec[1]->getComponentName() == "statefulProcessor");
+
+    // set hooks
+    const auto processController = std::dynamic_pointer_cast<ProcessorController>(controllerVec[1]);
+    assert(processController != nullptr);
+    statefulProcessor_ = std::dynamic_pointer_cast<StatefulProcessor>(processController->getProcessor());
+    assert(statefulProcessor_ != nullptr);
+    statefulProcessor_->setHooks(onScheduleHook_, onTriggerHooks_);
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+      return statefulProcessor_->hasFinishedHooks() && logChecker_();
+    }));
+  }
+
+ private:
+  const StatefulProcessor::HookType onScheduleHook_;
+  const StatefulProcessor::HookListType onTriggerHooks_;
+  const LogChecker logChecker_;
+  const std::string testCase_;
+  std::shared_ptr<StatefulProcessor> statefulProcessor_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<StatefulIntegrationTest>::getLogger()};
+};
+
+const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1"}, {"key2", "value2"}};
+const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}};
+
+auto standardLogChecker = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto warningResult = utils::StringUtils::countOccurrences(logs, "[warning]");
+  return errorResult.second == 0 && warningResult.second == 0;
+};
+
+auto commitAndRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Process Session Operation: State manager commit failed.\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs,
+    "[warning] Caught Exception during process session rollback: Process Session Operation: State manager rollback failed.");
+  return errorResult.second == 0 && commitWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+auto exceptionRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto exceptionWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering rollback\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed");
+  return errorResult.second == 0 && exceptionWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
+  {"State_is_recorded_after_committing", {
+    {},
+    {
+     [] (core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [] (core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  standardLogChecker
+  }},
+  {"State_is_discarded_after_rolling_back", {
+     {},
+     {
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState2));
+       throw std::runtime_error("Triggering rollback");
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  exceptionRollbackWarnings
+  }},
+  {
+    "Get_in_onSchedule_without_previous_state", {
+    [](core::CoreComponentStateManager& stateManager) {
+      std::unordered_map<std::string, std::string> state;
+      assert(!stateManager.get(state));
+      assert(state.empty());
+    },
+    {},
+    standardLogChecker
+    }
+  },
+  {
+    "Set_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(stateManager.set(exampleState));
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+    },
+    standardLogChecker
+    }
+  },
+  {
+    "Clear_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(!stateManager.clear());
+      assert(stateManager.set(exampleState));
+      assert(stateManager.clear());
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(!stateManager.get(state));
+      assert(state.empty());
+     }
+    },
+    standardLogChecker
+    },
+  },
+  {
+    "Persist_in_onSchedule", {
+    {
+      [](core::CoreComponentStateManager& stateManager) {
+        assert(stateManager.persist());
+      }
+      },

Review comment:
       We have strange indentation here.

##########
File path: libminifi/test/integration/StateTransactionalityTests.cpp
##########
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include <iostream>
+#include "IntegrationBase.h"
+#include "../StatefulProcessor.h"
+#include "../TestBase.h"
+#include "utils/IntegrationTestUtils.h"
+#include "core/state/ProcessorController.h"
+
+using org::apache::nifi::minifi::processors::StatefulProcessor;
+using org::apache::nifi::minifi::state::ProcessorController;
+
+namespace {
+using LogChecker = std::function<bool()>;
+
+struct HookCollection {
+  StatefulProcessor::HookType onScheduleHook_;
+  StatefulProcessor::HookListType onTriggerHooks_;
+  LogChecker logChecker_;
+};
+
+class StatefulIntegrationTest : public IntegrationBase {
+ public:
+  explicit StatefulIntegrationTest(std::string testCase, HookCollection hookCollection)
+    : onScheduleHook_(std::move(hookCollection.onScheduleHook_))
+    , onTriggerHooks_(std::move(hookCollection.onTriggerHooks_))
+    , logChecker_(hookCollection.logChecker_)
+    , testCase_(std::move(testCase)) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().reset();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setDebug<core::ProcessSession>();
+    LogTestController::getInstance().setDebug<StatefulIntegrationTest>();
+    logger_->log_info("Running test case \"%s\"", testCase_);
+  }
+
+  void updateProperties(std::shared_ptr<minifi::FlowController> fc) override {
+    const auto controllerVec = fc->getAllComponents();
+    /* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor
+     * (See TestStateTransactionality.yml)
+     * In this case there are two components in the flowcontroller: first is the controller itself,
+     * second is the processor that the test uses.
+     * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
+     * that most probably means a breaking change. */
+    assert(controllerVec.size() == 2);
+    assert(controllerVec[0]->getComponentName() == "FlowController");
+    assert(controllerVec[1]->getComponentName() == "statefulProcessor");
+
+    // set hooks
+    const auto processController = std::dynamic_pointer_cast<ProcessorController>(controllerVec[1]);
+    assert(processController != nullptr);
+    statefulProcessor_ = std::dynamic_pointer_cast<StatefulProcessor>(processController->getProcessor());
+    assert(statefulProcessor_ != nullptr);
+    statefulProcessor_->setHooks(onScheduleHook_, onTriggerHooks_);
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+      return statefulProcessor_->hasFinishedHooks() && logChecker_();
+    }));
+  }
+
+ private:
+  const StatefulProcessor::HookType onScheduleHook_;
+  const StatefulProcessor::HookListType onTriggerHooks_;
+  const LogChecker logChecker_;
+  const std::string testCase_;
+  std::shared_ptr<StatefulProcessor> statefulProcessor_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<StatefulIntegrationTest>::getLogger()};
+};
+
+const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1"}, {"key2", "value2"}};
+const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}};
+
+auto standardLogChecker = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto warningResult = utils::StringUtils::countOccurrences(logs, "[warning]");
+  return errorResult.second == 0 && warningResult.second == 0;
+};
+
+auto commitAndRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Process Session Operation: State manager commit failed.\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs,
+    "[warning] Caught Exception during process session rollback: Process Session Operation: State manager rollback failed.");
+  return errorResult.second == 0 && commitWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+auto exceptionRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto exceptionWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering rollback\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed");
+  return errorResult.second == 0 && exceptionWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
+  {"State_is_recorded_after_committing", {
+    {},
+    {
+     [] (core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [] (core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  standardLogChecker
+  }},
+  {"State_is_discarded_after_rolling_back", {
+     {},
+     {
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState2));
+       throw std::runtime_error("Triggering rollback");
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  exceptionRollbackWarnings
+  }},
+  {
+    "Get_in_onSchedule_without_previous_state", {
+    [](core::CoreComponentStateManager& stateManager) {
+      std::unordered_map<std::string, std::string> state;
+      assert(!stateManager.get(state));
+      assert(state.empty());
+    },
+    {},
+    standardLogChecker
+    }
+  },
+  {
+    "Set_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(stateManager.set(exampleState));
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+    },
+    standardLogChecker
+    }
+  },
+  {
+    "Clear_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(!stateManager.clear());
+      assert(stateManager.set(exampleState));
+      assert(stateManager.clear());
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(!stateManager.get(state));
+      assert(state.empty());
+     }
+    },
+    standardLogChecker
+    },
+  },
+  {
+    "Persist_in_onSchedule", {
+    {
+      [](core::CoreComponentStateManager& stateManager) {
+        assert(stateManager.persist());
+      }
+      },
+    {},
+    standardLogChecker
+    }
+  },
+  {
+    "Manual_beginTransaction", {
+    {},
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(!stateManager.beginTransaction());
+     }
+    },

Review comment:
       If it's indented, it should be 2 spaces deeper, not 1. This is not the only occurrence of this issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r746478931



##########
File path: libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
##########
@@ -34,29 +34,39 @@ namespace controllers {
 class AbstractCoreComponentStateManagerProvider : public std::enable_shared_from_this<AbstractCoreComponentStateManagerProvider>,
                                                    public core::CoreComponentStateManagerProvider {
  public:
-  ~AbstractCoreComponentStateManagerProvider() override;
-
   std::shared_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) override;
-
   std::map<utils::Identifier, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
 
   class AbstractCoreComponentStateManager : public core::CoreComponentStateManager{
    public:
     AbstractCoreComponentStateManager(std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider, const utils::Identifier& id);
 
-    bool set(const core::CoreComponentState& kvs) override;
+    ~AbstractCoreComponentStateManager() override;
 
+    bool set(const core::CoreComponentState& kvs) override;
     bool get(core::CoreComponentState& kvs) override;
-
     bool clear() override;
-
     bool persist() override;
 
+    bool isTransactionInProgress() const override;
+    bool beginTransaction() override;
+    bool commit() override;
+    bool rollback() override;
+
    private:
+    enum class ChangeType {
+      NONE,
+      SET,
+      CLEAR
+    };
+
     std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider_;
     utils::Identifier id_;
     bool state_valid_;
     core::CoreComponentState state_;
+    bool transactionInProgress_;
+    ChangeType changeType_;
+    core::CoreComponentState stateToSet_;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r684178332



##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       Line was moved after discussion in private.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r676525090



##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       And do we need whole archive, wouldn't normal linking be enough? I think static initialization would be done even with normal linking, but I'm not 100% sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r677143318



##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       In which executable? This one affects integration tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r676527858



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -332,7 +334,6 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
 bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
   {
     const TimeDiff time_diff;
-    session->commit();

Review comment:
       Without this call, the whole function loses it's purpose. We should remove it. I assume we no longer need to benchmark commit times as they should be fast with recent improvements.

##########
File path: extensions/windows-event-log/tests/BookmarkTests.cpp
##########
@@ -40,7 +40,10 @@ std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
                                          const utils::Identifier &uuid = IdGenerator::getIdGenerator()->generate()) {
   const auto state_manager = test_plan.getStateManagerProvider()->getCoreComponentStateManager(uuid);
   const auto logger = test_plan.getLogger();
-  return utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->beginTransaction();
+  auto bookmark = utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  state_manager->commit();

Review comment:
       What happens when we forget to create a transaction or forget to commit outside of onTrigger, where it's implicit? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r676529139



##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       My experience is that it would be optimized out. Also seen this in another project. Without this line the test would fail (it did until I came up with this). Maybe it also depends on linkers and their settings.
   https://stackoverflow.com/questions/7327741/initialisation-of-static-object-when-linking-against-a-static-library




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r677307361



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -332,7 +334,6 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
 bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
   {
     const TimeDiff time_diff;
-    session->commit();

Review comment:
       For the record: This was a bad idea, I mistakenly identified the first block as the function body. We discussed the solution in private.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r725025035



##########
File path: libminifi/test/StatefulProcessor.h
##########
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+#include "core/Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class StatefulProcessor : public core::Processor {
+ public:
+  using core::Processor::Processor;
+
+  using HookType = std::function<void(core::CoreComponentStateManager&)>;
+  using HookListType = std::vector<HookType>;

Review comment:
       I agree, HookListType doesn't shorten much, unline HookType.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r726060706



##########
File path: libminifi/test/StatefulProcessor.cpp
##########
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include "StatefulProcessor.h"
+#include "Exception.h"
+#include "core/Resource.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+void StatefulProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {

Review comment:
       This only works if `context` is valid. Consider adding `gsl_Expects(context);`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r746474611



##########
File path: extensions/coap/tests/CoapIntegrationBase.h
##########
@@ -37,7 +37,7 @@ int ssl_enable(void *, void *) {
 
 class CoapIntegrationBase : public IntegrationBase {
  public:
-  explicit CoapIntegrationBase(uint64_t waitTime = 5000)
+  explicit CoapIntegrationBase(std::chrono::milliseconds waitTime = std::chrono::milliseconds(5000))

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r676624661



##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       I think we have UnorderedMapKeyValueStoreService working on main without this change. I wonder how was it working before this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r727086593



##########
File path: libminifi/test/StatefulProcessor.cpp
##########
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include "StatefulProcessor.h"
+#include "Exception.h"
+#include "core/Resource.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+void StatefulProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r746477485



##########
File path: extensions/windows-event-log/tests/BookmarkTests.cpp
##########
@@ -41,7 +41,8 @@ std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
                                          const utils::Identifier &uuid = IdGenerator::getIdGenerator()->generate()) {
   const auto state_manager = test_plan.getStateManagerProvider()->getCoreComponentStateManager(uuid);
   const auto logger = test_plan.getLogger();
-  return std::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  auto bookmark = std::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
+  return bookmark;

Review comment:
       Honestly, I don't remember. I think there were other commits where there were operations before returning, and they weren't completely cleaned up. I will revert and test it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r676519996



##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -220,7 +219,9 @@ void QueryDatabaseTable::initializeMaxValues(core::ProcessContext &context) {
     logger_->log_info("Found no stored state");
   } else {
     if (!loadMaxValuesFromStoredState(stored_state)) {
+      state_manager_->beginTransaction();
       state_manager_->clear();
+      state_manager_->commit();

Review comment:
       From now on state transactions begin with beginTransaction and end with either commit or rollback. In onTrigger beginTransaction and commit (or rollback) are called automatically by the creation and commit/rollback calls on ProcessSession. Processor::onTrigger wraps the calling of the derived processor's onTrigger (different signature) and manages the session.
   This method above is called from onSchedule. For onSchedule there is no session, so it has to be manual.

##########
File path: cmake/BuildTests.cmake
##########
@@ -151,11 +151,13 @@ SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)
   add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}")
+  target_wholearchive_library("${testfilename}" minifi-standard-processors)

Review comment:
       UnorderedMapKeyValueStoreService (or some other state storage) has to be registered, otherwise we can't use it. Registration is done at static initialization time using the REGISTER_RESOURCE macro. It would be optimized out if not linked and test would fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r725025809



##########
File path: libminifi/test/integration/StateTransactionalityTests.cpp
##########
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include <iostream>
+#include "IntegrationBase.h"
+#include "../StatefulProcessor.h"
+#include "../TestBase.h"
+#include "utils/IntegrationTestUtils.h"
+#include "core/state/ProcessorController.h"
+
+using org::apache::nifi::minifi::processors::StatefulProcessor;
+using org::apache::nifi::minifi::state::ProcessorController;
+
+namespace {
+using LogChecker = std::function<bool()>;
+
+struct HookCollection {
+  StatefulProcessor::HookType onScheduleHook_;
+  StatefulProcessor::HookListType onTriggerHooks_;
+  LogChecker logChecker_;
+};
+
+class StatefulIntegrationTest : public IntegrationBase {
+ public:
+  explicit StatefulIntegrationTest(std::string testCase, HookCollection hookCollection)
+    : onScheduleHook_(std::move(hookCollection.onScheduleHook_))
+    , onTriggerHooks_(std::move(hookCollection.onTriggerHooks_))
+    , logChecker_(hookCollection.logChecker_)
+    , testCase_(std::move(testCase)) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().reset();
+    LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setDebug<core::ProcessSession>();
+    LogTestController::getInstance().setDebug<StatefulIntegrationTest>();
+    logger_->log_info("Running test case \"%s\"", testCase_);
+  }
+
+  void updateProperties(std::shared_ptr<minifi::FlowController> fc) override {
+    const auto controllerVec = fc->getAllComponents();
+    /* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor
+     * (See TestStateTransactionality.yml)
+     * In this case there are two components in the flowcontroller: first is the controller itself,
+     * second is the processor that the test uses.
+     * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
+     * that most probably means a breaking change. */
+    assert(controllerVec.size() == 2);
+    assert(controllerVec[0]->getComponentName() == "FlowController");
+    assert(controllerVec[1]->getComponentName() == "statefulProcessor");
+
+    // set hooks
+    const auto processController = std::dynamic_pointer_cast<ProcessorController>(controllerVec[1]);
+    assert(processController != nullptr);
+    statefulProcessor_ = std::dynamic_pointer_cast<StatefulProcessor>(processController->getProcessor());
+    assert(statefulProcessor_ != nullptr);
+    statefulProcessor_->setHooks(onScheduleHook_, onTriggerHooks_);
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+      return statefulProcessor_->hasFinishedHooks() && logChecker_();
+    }));
+  }
+
+ private:
+  const StatefulProcessor::HookType onScheduleHook_;
+  const StatefulProcessor::HookListType onTriggerHooks_;
+  const LogChecker logChecker_;
+  const std::string testCase_;
+  std::shared_ptr<StatefulProcessor> statefulProcessor_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<StatefulIntegrationTest>::getLogger()};
+};
+
+const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1"}, {"key2", "value2"}};
+const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}};
+
+auto standardLogChecker = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto warningResult = utils::StringUtils::countOccurrences(logs, "[warning]");
+  return errorResult.second == 0 && warningResult.second == 0;
+};
+
+auto commitAndRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Process Session Operation: State manager commit failed.\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs,
+    "[warning] Caught Exception during process session rollback: Process Session Operation: State manager rollback failed.");
+  return errorResult.second == 0 && commitWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+auto exceptionRollbackWarnings = [] {
+  const std::string logs = LogTestController::getInstance().log_output.str();
+  const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]");
+  const auto exceptionWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering rollback\"");
+  const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed");
+  return errorResult.second == 0 && exceptionWarningResult.second == 1 && rollbackWarningResult.second == 1;
+};
+
+const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
+  {"State_is_recorded_after_committing", {
+    {},
+    {
+     [] (core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [] (core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  standardLogChecker
+  }},
+  {"State_is_discarded_after_rolling_back", {
+     {},
+     {
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState));
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(stateManager.set(exampleState2));
+       throw std::runtime_error("Triggering rollback");
+     },
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+  },
+  exceptionRollbackWarnings
+  }},
+  {
+    "Get_in_onSchedule_without_previous_state", {
+    [](core::CoreComponentStateManager& stateManager) {
+      std::unordered_map<std::string, std::string> state;
+      assert(!stateManager.get(state));
+      assert(state.empty());
+    },
+    {},
+    standardLogChecker
+    }
+  },
+  {
+    "Set_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(stateManager.set(exampleState));
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(stateManager.get(state));
+       assert(state == exampleState);
+     }
+    },
+    standardLogChecker
+    }
+  },
+  {
+    "Clear_in_onSchedule", {
+    [](core::CoreComponentStateManager& stateManager) {
+      assert(!stateManager.clear());
+      assert(stateManager.set(exampleState));
+      assert(stateManager.clear());
+    },
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       std::unordered_map<std::string, std::string> state;
+       assert(!stateManager.get(state));
+      assert(state.empty());
+     }
+    },
+    standardLogChecker
+    },
+  },
+  {
+    "Persist_in_onSchedule", {
+    {
+      [](core::CoreComponentStateManager& stateManager) {
+        assert(stateManager.persist());
+      }
+      },
+    {},
+    standardLogChecker
+    }
+  },
+  {
+    "Manual_beginTransaction", {
+    {},
+    {
+     [](core::CoreComponentStateManager& stateManager) {
+       assert(!stateManager.beginTransaction());
+     }
+    },

Review comment:
       Fixed everywhere, please check, I might missed something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a change in pull request #1107: MINIFICPP-1338 - Handle processor state persistence through the session

Posted by GitBox <gi...@apache.org>.
adam-markovics commented on a change in pull request #1107:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r677143048



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -332,7 +334,6 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
 bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
   {
     const TimeDiff time_diff;
-    session->commit();

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org