You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/09/03 12:53:36 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

fgerlits commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r482939199



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return bookmark_xml;
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
+  auto bookmark_xml = processEventLogs(context,session, processed_event_count, event_query_results);
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
-
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count > 0 && !commitAndSaveBookmark(bookmark_xml, session)) {

Review comment:
       I think we should yield if no events were processed, too.  I know we didn't before this change, but that seems like a bug.
   ```suggestion
     if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
   ```

##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {

Review comment:
       I would do this the other way round: return the number of events processed, and use an out parameter for the bookmark XML.  Not important, but I think that is a more common pattern.
   
   Another option would be to return a struct `{ size_t processed_event_count; std::wstring bookmark_xml; }`.

##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -367,27 +372,19 @@ void batchCommitSizeTestHelper(int batch_commit_size, int expected_num_commits)
   test_plan->reset();
   LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
-  {
-    reportEvent(APPLICATION_CHANNEL, "Event one");
-    reportEvent(APPLICATION_CHANNEL, "Event two");
-    reportEvent(APPLICATION_CHANNEL, "Event three");
-    reportEvent(APPLICATION_CHANNEL, "Event four");
-    reportEvent(APPLICATION_CHANNEL, "Event five");
-
-    test_controller.runSession(test_plan);
-
-    REQUIRE(LogTestController::getInstance().countOccurrences("processQueue commit") == expected_num_commits);
-  }
+  std::vector<std::string> events{"Event one", "Event two", "Event three", "Event four", "Event five"};
+  std::for_each(events.begin(), events.end(), [](const std::string& event){ reportEvent(APPLICATION_CHANNEL, event.c_str()); });
+  test_controller.runSession(test_plan);
+  auto expected_event_count = events.size() <= batch_commit_size || batch_commit_size == 0 ? events.size() : batch_commit_size;
+  REQUIRE(LogTestController::getInstance().contains("processed " + std::to_string(expected_event_count) + " Events"));
 }
 
 }  // namespace
 
 TEST_CASE("ConsumeWindowsEventLog batch commit size works", "[onTrigger]") {
-  batchCommitSizeTestHelper(1000, 1);
-  batchCommitSizeTestHelper(5, 1);
-  batchCommitSizeTestHelper(4, 2);
-  batchCommitSizeTestHelper(3, 2);
-  batchCommitSizeTestHelper(2, 3);
-  batchCommitSizeTestHelper(1, 5);
-  batchCommitSizeTestHelper(0, 1);
+  batchCommitSizeTestHelper(1000);
+  batchCommitSizeTestHelper(5);
+  batchCommitSizeTestHelper(4);
+  batchCommitSizeTestHelper(1);
+  batchCommitSizeTestHelper(0);

Review comment:
       I think the test is clearer if you explicitly specify the expected result, as before.  Maybe the number of events could be a parameter, too, eg. `batchCommitSizeTestHelper(int num_events_read, int batch_size, int num_events_processed)`.
   ```suggestion
     batchCommitSizeTestHelper(5, 1000, 5);
     batchCommitSizeTestHelper(5, 5, 5);
     batchCommitSizeTestHelper(5, 4, 4);
     batchCommitSizeTestHelper(5, 1, 1);
     batchCommitSizeTestHelper(5, 0, 5);
   ```

##########
File path: extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
##########
@@ -335,21 +335,26 @@ TEST_CASE("ConsumeWindowsEventLog prints events in XML correctly", "[onTrigger]"
     test_controller.runSession(test_plan);
 
     REQUIRE(LogTestController::getInstance().contains(R"(<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Application"/>)"));
-    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID><Level>4</Level><Task>0</Task><Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Level>4</Level>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Task>0</Task>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
     // the timestamp (when the event was published) goes here
     REQUIRE(LogTestController::getInstance().contains(R"("/><EventRecordID>)"));
     // the ID of the event goes here (a number)
-    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID><Channel>Application</Channel><Computer>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID>)"));
+    REQUIRE(LogTestController::getInstance().contains(R"(<Channel>Application</Channel><Computer>)"));

Review comment:
       Why did you change this?  This is now a weaker condition, allowing additional elements, any order of the elements, multiple XML blocks each containing some of these elements etc.

##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,108 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::wstring ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session,
+    size_t& processed_event_count, const EVT_HANDLE& event_query_results) {

Review comment:
       could this be `const`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org