You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/09/02 15:32:13 UTC

[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

lordgamez opened a new pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r483092783



##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -367,27 +372,19 @@ void batchCommitSizeTestHelper(int batch_commit_size, int expected_num_commits)
   test_plan->reset();
   LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
-  {
-    reportEvent(APPLICATION_CHANNEL, "Event one");
-    reportEvent(APPLICATION_CHANNEL, "Event two");
-    reportEvent(APPLICATION_CHANNEL, "Event three");
-    reportEvent(APPLICATION_CHANNEL, "Event four");
-    reportEvent(APPLICATION_CHANNEL, "Event five");
-
-    test_controller.runSession(test_plan);
-
-    REQUIRE(LogTestController::getInstance().countOccurrences("processQueue commit") == expected_num_commits);
-  }
+  std::vector<std::string> events{"Event one", "Event two", "Event three", "Event four", "Event five"};

Review comment:
       Minor, but why not just a simple range based for here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r484782831



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {

Review comment:
       Unfortunately no, because of `getEventLogHandler` in the `createEventRender` call. But I made 2 other methods const in [0a32d4e](https://github.com/apache/nifi-minifi-cpp/pull/891/commits/0a32d4e3eb179ea9fbf8d2914c70997430f91c58)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r482939199



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return bookmark_xml;
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
+  auto bookmark_xml = processEventLogs(context,session, processed_event_count, event_query_results);
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
-
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count > 0 && !commitAndSaveBookmark(bookmark_xml, session)) {

Review comment:
       I think we should yield if no events were processed, too.  I know we didn't before this change, but that seems like a bug.
   ```suggestion
     if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
   ```

##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {

Review comment:
       I would do this the other way round: return the number of events processed, and use an out parameter for the bookmark XML.  Not important, but I think that is a more common pattern.
   
   Another option would be to return a struct `{ size_t processed_event_count; std::wstring bookmark_xml; }`.

##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -367,27 +372,19 @@ void batchCommitSizeTestHelper(int batch_commit_size, int expected_num_commits)
   test_plan->reset();
   LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
-  {
-    reportEvent(APPLICATION_CHANNEL, "Event one");
-    reportEvent(APPLICATION_CHANNEL, "Event two");
-    reportEvent(APPLICATION_CHANNEL, "Event three");
-    reportEvent(APPLICATION_CHANNEL, "Event four");
-    reportEvent(APPLICATION_CHANNEL, "Event five");
-
-    test_controller.runSession(test_plan);
-
-    REQUIRE(LogTestController::getInstance().countOccurrences("processQueue commit") == expected_num_commits);
-  }
+  std::vector<std::string> events{"Event one", "Event two", "Event three", "Event four", "Event five"};
+  std::for_each(events.begin(), events.end(), [](const std::string& event){ reportEvent(APPLICATION_CHANNEL, event.c_str()); });
+  test_controller.runSession(test_plan);
+  auto expected_event_count = events.size() <= batch_commit_size || batch_commit_size == 0 ? events.size() : batch_commit_size;
+  REQUIRE(LogTestController::getInstance().contains("processed " + std::to_string(expected_event_count) + " Events"));
 }
 
 }  // namespace
 
 TEST_CASE("ConsumeWindowsEventLog batch commit size works", "[onTrigger]") {
-  batchCommitSizeTestHelper(1000, 1);
-  batchCommitSizeTestHelper(5, 1);
-  batchCommitSizeTestHelper(4, 2);
-  batchCommitSizeTestHelper(3, 2);
-  batchCommitSizeTestHelper(2, 3);
-  batchCommitSizeTestHelper(1, 5);
-  batchCommitSizeTestHelper(0, 1);
+  batchCommitSizeTestHelper(1000);
+  batchCommitSizeTestHelper(5);
+  batchCommitSizeTestHelper(4);
+  batchCommitSizeTestHelper(1);
+  batchCommitSizeTestHelper(0);

Review comment:
       I think the test is clearer if you explicitly specify the expected result, as before.  Maybe the number of events could be a parameter, too, eg. `batchCommitSizeTestHelper(int num_events_read, int batch_size, int num_events_processed)`.
   ```suggestion
     batchCommitSizeTestHelper(5, 1000, 5);
     batchCommitSizeTestHelper(5, 5, 5);
     batchCommitSizeTestHelper(5, 4, 4);
     batchCommitSizeTestHelper(5, 1, 1);
     batchCommitSizeTestHelper(5, 0, 5);
   ```

##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -335,21 +335,26 @@ TEST_CASE("ConsumeWindowsEventLog prints events in XML correctly", "[onTrigger]"
     test_controller.runSession(test_plan);
 
     REQUIRE(LogTestController::getInstance().contains(R"(<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Application"/>)"));
-    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID><Level>4</Level><Task>0</Task><Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Level>4</Level>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Task>0</Task>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
     // the timestamp (when the event was published) goes here
     REQUIRE(LogTestController::getInstance().contains(R"("/><EventRecordID>)"));
     // the ID of the event goes here (a number)
-    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID><Channel>Application</Channel><Computer>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Channel>Application</Channel><Computer>)"));

Review comment:
       Why did you change this?  This is now a weaker condition, allowing additional elements, any order of the elements, multiple XML blocks each containing some of these elements etc.

##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {

Review comment:
       could this be `const`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r489387584



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,111 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::tuple<size_t, std::wstring> ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& event_query_results) {
+  size_t processed_event_count = 0;
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return std::make_tuple(processed_event_count, bookmark_xml);
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
+  std::wstring bookmark_xml;
+  std::tie(processed_event_count, bookmark_xml) = processEventLogs(context, session, event_query_results);
 
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
+    context->yield();

Review comment:
       @fgerlits Could you elaborate why did you suggest that we should yield in this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r482990129



##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -335,21 +335,26 @@ TEST_CASE("ConsumeWindowsEventLog prints events in XML correctly", "[onTrigger]"
     test_controller.runSession(test_plan);
 
     REQUIRE(LogTestController::getInstance().contains(R"(<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Application"/>)"));
-    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID><Level>4</Level><Task>0</Task><Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Level>4</Level>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Task>0</Task>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
     // the timestamp (when the event was published) goes here
     REQUIRE(LogTestController::getInstance().contains(R"("/><EventRecordID>)"));
     // the ID of the event goes here (a number)
-    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID><Channel>Application</Channel><Computer>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Channel>Application</Channel><Computer>)"));

Review comment:
       The reason I weakened the condition is because the test was failing on my system, as the format of the event was something like [this](https://social.msdn.microsoft.com/Forums/windowsdesktop/en-US/5383445d-437e-4ad5-bf03-9584aef9527a/httpschemasmicrosoftcomwin200408eventseventquotgt?forum=windowsdesktopsearchhelp). There are a couple of additional tags in between, like the Correlation and Execution tag after the EventRecordID, which seem to be valid according to the schema. I am not sure what causes the difference.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r484783867



##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -367,27 +372,19 @@ void batchCommitSizeTestHelper(int batch_commit_size, int expected_num_commits)
   test_plan->reset();
   LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
-  {
-    reportEvent(APPLICATION_CHANNEL, "Event one");
-    reportEvent(APPLICATION_CHANNEL, "Event two");
-    reportEvent(APPLICATION_CHANNEL, "Event three");
-    reportEvent(APPLICATION_CHANNEL, "Event four");
-    reportEvent(APPLICATION_CHANNEL, "Event five");
-
-    test_controller.runSession(test_plan);
-
-    REQUIRE(LogTestController::getInstance().countOccurrences("processQueue commit") == expected_num_commits);
-  }
+  std::vector<std::string> events{"Event one", "Event two", "Event three", "Event four", "Event five"};

Review comment:
       Maybe I just wanted to keep it in a single line, but yea I see it's better to have a simple for loop here, fixed in [fa06429](https://github.com/apache/nifi-minifi-cpp/pull/891/commits/fa064298d99a1e0efc2609548062efd5f815085a)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r489443894



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,111 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::tuple<size_t, std::wstring> ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& event_query_results) {
+  size_t processed_event_count = 0;
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return std::make_tuple(processed_event_count, bookmark_xml);
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
+  std::wstring bookmark_xml;
+  std::tie(processed_event_count, bookmark_xml) = processEventLogs(context, session, event_query_results);
 
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
+    context->yield();

Review comment:
       I think we generally want to yield if there is nothing to do, so we wait a bit longer between `onTrigger()` calls and we don't waste resources.  @adebreceni Do you know why GetFile does not yield when it has nothing to do?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r489312613



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,111 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::tuple<size_t, std::wstring> ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& event_query_results) {
+  size_t processed_event_count = 0;
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return std::make_tuple(processed_event_count, bookmark_xml);
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
+  std::wstring bookmark_xml;
+  std::tie(processed_event_count, bookmark_xml) = processEventLogs(context, session, event_query_results);
 
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
+    context->yield();

Review comment:
       are we sure, we want to yield a CWEL processors with no events to process? e.g. GetFile does not yield if the input directory is empty 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r484783119



##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -367,27 +372,19 @@ void batchCommitSizeTestHelper(int batch_commit_size, int expected_num_commits)
   test_plan->reset();
   LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
-  {
-    reportEvent(APPLICATION_CHANNEL, "Event one");
-    reportEvent(APPLICATION_CHANNEL, "Event two");
-    reportEvent(APPLICATION_CHANNEL, "Event three");
-    reportEvent(APPLICATION_CHANNEL, "Event four");
-    reportEvent(APPLICATION_CHANNEL, "Event five");
-
-    test_controller.runSession(test_plan);
-
-    REQUIRE(LogTestController::getInstance().countOccurrences("processQueue commit") == expected_num_commits);
-  }
+  std::vector<std::string> events{"Event one", "Event two", "Event three", "Event four", "Event five"};
+  std::for_each(events.begin(), events.end(), [](const std::string& event){ reportEvent(APPLICATION_CHANNEL, event.c_str()); });
+  test_controller.runSession(test_plan);
+  auto expected_event_count = events.size() <= batch_commit_size || batch_commit_size == 0 ? events.size() : batch_commit_size;
+  REQUIRE(LogTestController::getInstance().contains("processed " + std::to_string(expected_event_count) + " Events"));
 }
 
 }  // namespace
 
 TEST_CASE("ConsumeWindowsEventLog batch commit size works", "[onTrigger]") {
-  batchCommitSizeTestHelper(1000, 1);
-  batchCommitSizeTestHelper(5, 1);
-  batchCommitSizeTestHelper(4, 2);
-  batchCommitSizeTestHelper(3, 2);
-  batchCommitSizeTestHelper(2, 3);
-  batchCommitSizeTestHelper(1, 5);
-  batchCommitSizeTestHelper(0, 1);
+  batchCommitSizeTestHelper(1000);
+  batchCommitSizeTestHelper(5);
+  batchCommitSizeTestHelper(4);
+  batchCommitSizeTestHelper(1);
+  batchCommitSizeTestHelper(0);

Review comment:
       You are right, I made it more explicit in [fa06429](https://github.com/apache/nifi-minifi-cpp/pull/891/commits/fa064298d99a1e0efc2609548062efd5f815085a)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r483008012



##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -335,21 +335,26 @@ TEST_CASE("ConsumeWindowsEventLog prints events in XML correctly", "[onTrigger]"
     test_controller.runSession(test_plan);
 
     REQUIRE(LogTestController::getInstance().contains(R"(<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Application"/>)"));
-    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID><Level>4</Level><Task>0</Task><Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Level>4</Level>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Task>0</Task>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
     // the timestamp (when the event was published) goes here
     REQUIRE(LogTestController::getInstance().contains(R"("/><EventRecordID>)"));
     // the ID of the event goes here (a number)
-    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID><Channel>Application</Channel><Computer>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Channel>Application</Channel><Computer>)"));

Review comment:
       I see, that's fine then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r482990129



##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -335,21 +335,26 @@ TEST_CASE("ConsumeWindowsEventLog prints events in XML correctly", "[onTrigger]"
     test_controller.runSession(test_plan);
 
     REQUIRE(LogTestController::getInstance().contains(R"(<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Application"/>)"));
-    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID><Level>4</Level><Task>0</Task><Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Level>4</Level>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Task>0</Task>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
     // the timestamp (when the event was published) goes here
     REQUIRE(LogTestController::getInstance().contains(R"("/><EventRecordID>)"));
     // the ID of the event goes here (a number)
-    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID><Channel>Application</Channel><Computer>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Channel>Application</Channel><Computer>)"));

Review comment:
       The reason I weakened the condition is because the test was failing on my system, as the output of the event was something like [this](https://social.msdn.microsoft.com/Forums/windowsdesktop/en-US/5383445d-437e-4ad5-bf03-9584aef9527a/httpschemasmicrosoftcomwin200408eventseventquotgt?forum=windowsdesktopsearchhelp). There are a couple of additional tags in between, like the Correlation and Execution tag after the EventRecordID, which seem to be valid according to the schema. I am not sure what causes the difference.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r489454269



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,111 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::tuple<size_t, std::wstring> ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& event_query_results) {
+  size_t processed_event_count = 0;
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return std::make_tuple(processed_event_count, bookmark_xml);
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
+  std::wstring bookmark_xml;
+  std::tie(processed_event_count, bookmark_xml) = processEventLogs(context, session, event_query_results);
 
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
+    context->yield();

Review comment:
       I don't know why GetFile doesn't yield, but if the purpose of `yield` is resource optimization it is rightfully here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r484781607



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {

Review comment:
       As both are output parameters I decided to return a tuple with both values. Added in [24855ee](https://github.com/apache/nifi-minifi-cpp/pull/891/commits/24855eeb70fffe2f33b065bcd75dc77f8d5aa3f5)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r484781905



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return bookmark_xml;
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
+  auto bookmark_xml = processEventLogs(context,session, processed_event_count, event_query_results);
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
-
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count > 0 && !commitAndSaveBookmark(bookmark_xml, session)) {

Review comment:
       Done in [82aac08](https://github.com/apache/nifi-minifi-cpp/pull/891/commits/82aac084e181abb4f213287ebd4e883255a73739)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org