You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/09/16 09:53:32 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #891: MINIFICPP-1349 ConsumeWindowsEventLogs should honor batch commit size in a single session

adamdebreceni commented on a change in pull request #891:
URL: https://github.com/apache/nifi-minifi-cpp/pull/891#discussion_r489312613



##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -298,120 +298,111 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   logger_->log_trace("Successfully configured CWEL");
 }
 
+bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
+  {
+    const TimeDiff time_diff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
+  }
+
+  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
+    logger_->log_error("Failed to save bookmark xml");
+  }
+
+  if (session->outgoingConnectionsFull("success")) {
+    logger_->log_debug("Outgoing success connection is full");
+    return false;
+  }
+
+  return true;
+}
+
+std::tuple<size_t, std::wstring> ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context,
+    const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& event_query_results) {
+  size_t processed_event_count = 0;
+  std::wstring bookmark_xml;
+  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
+  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
+    EVT_HANDLE next_event{};
+    DWORD handles_set_count{};
+    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LogWindowsError("Failed to get next event");
+        continue;
+        /* According to MS this iteration should only end when the return value is false AND
+          the error code is NO_MORE_ITEMS. See the following page for further details:
+          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
+      }
+      break;
+    }
+
+    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
+    logger_->log_trace("Succesfully got the next event, performing event rendering");
+    EventRender event_render;
+    std::wstring new_bookmark_xml;
+    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
+      bookmark_xml = std::move(new_bookmark_xml);
+      processed_event_count++;
+      putEventRenderFlowFileToSession(event_render, *session);
+    }
+  }
+  logger_->log_trace("Finished enumerating events.");
+  return std::make_tuple(processed_event_count, bookmark_xml);
+}
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!pBookmark_) {
-    logger_->log_debug("pBookmark_ is null");
+  if (!bookmark_) {
+    logger_->log_debug("bookmark_ is null");
     context->yield();
     return;
   }
 
-  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
   if (!lock.owns_lock()) {
     logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
     return;
   }
 
   logger_->log_trace("CWEL onTrigger");
 
-  struct TimeDiff {
-    auto operator()() const {
-      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
-    }
-    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
-  };
-
-  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
-    const TimeDiff timeDiff;
-    session->commit();
-    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
-
-    const bool successful_save = pBookmark_->saveBookmarkXml(bookmarkXml);
-    if (!successful_save) {
-      logger_->log_error("Failed to save bookmark xml");
-    }
-
-    if (session->outgoingConnectionsFull("success")) {
-      logger_->log_debug("outgoingConnectionsFull");
-      return false;
-    }
-
-    return true;
-  };
-
-  size_t eventCount = 0;
-  const TimeDiff timeDiff;
+  size_t processed_event_count = 0;
+  const TimeDiff time_diff;
   const auto timeGuard = gsl::finally([&]() {
-    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
   });
 
-  size_t commitAndSaveBookmarkCount = 0;
-  std::wstring bookmarkXml;
-
-  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
-  if (!hEventResults) {
+  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!event_query_results) {
     LOG_LAST_ERROR(EvtQuery);
     context->yield();
     return;
   }
-  const auto guard_hEventResults = gsl::finally([hEventResults]() { EvtClose(hEventResults); });
+  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });
 
   logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());
 
-  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
-  if (!hBookmark) {
-    logger_->log_error("hBookmark is null, unrecoverable error!"); 
-    pBookmark_.reset();
+  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
+  if (!bookmark_handle) {
+    logger_->log_error("bookmark_handle is null, unrecoverable error!");
+    bookmark_.reset();
     context->yield();
     return;
   }
 
-  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
     LOG_LAST_ERROR(EvtSeek);
     context->yield();
     return;
   }
 
   refreshTimeZoneData();
 
-  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, EVT_NEXT_TIMEOUT_MS, 0, &dwReturned)) {
-      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
-        LogWindowsError("Failed to get next event");
-        continue; 
-        /* According to MS this iteration should only end when the return value is false AND 
-         the error code is NO_MORE_ITEMS. See the following page for further details:
-         https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
-      }
-      break;
-    }
-    const auto guard_hEvent = gsl::finally([hEvent]() { EvtClose(hEvent); });
-    logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
-    EventRender eventRender;
-    std::wstring newBookmarkXml;
-    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
-      bookmarkXml = std::move(newBookmarkXml);
-      eventCount++;
-      putEventRenderFlowFileToSession(eventRender, *session);
-
-      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
-        if (!commitAndSaveBookmark(bookmarkXml)) {
-          context->yield();
-          return;
-        }
-
-        commitAndSaveBookmarkCount = eventCount;
-      }
-    }
-  }
-
-  logger_->log_trace("Finish enumerating events.");
+  std::wstring bookmark_xml;
+  std::tie(processed_event_count, bookmark_xml) = processEventLogs(context, session, event_query_results);
 
-  if (eventCount > commitAndSaveBookmarkCount) {
-    commitAndSaveBookmark(bookmarkXml);
+  if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
+    context->yield();

Review comment:
       are we sure, we want to yield a CWEL processors with no events to process? e.g. GetFile does not yield if the input directory is empty 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org