You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/02/24 11:04:52 UTC

[GitHub] [nifi-minifi-cpp] am-c-p-p opened a new pull request #741: MINIFICPP-1139 Implemented.

am-c-p-p opened a new pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741
 
 
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on issue #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on issue #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#issuecomment-592702147
 
 
   All review comments are addressed.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385727475
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
 
 Review comment:
   I prefer to keep new code correct even if this means inconsistency with old code. I'll do the  wider scale replacement in [MINIFICPP-1151](https://issues.apache.org/jira/projects/MINIFICPP/issues/MINIFICPP-1151) as it results in compiler warnings.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383785899
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
+
+    // Enumerate the events in the result set after the bookmarked event.
+    bool commitAndSaveBookmark{};
+    std::wstring bookmarkXml;
+    while (true) {
+      EVT_HANDLE hEvent{};
+      DWORD dwReturned{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        DWORD status = ERROR_SUCCESS;
+        if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+          logger_->log_error("!EvtNext error %d.", status);
+        }
+        break;
+      }
+      const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+      commitAndSaveBookmark = false;
+      EventRender renderedData;
+      if (processEvent(hEvent, renderedData)) {
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
+          commitAndSaveBookmark = true;
+
+          eventCount++;
+          processEventRender(renderedData, session);
+
+          if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+            auto before_commit = std::chrono::high_resolution_clock::now();
+            session.commit();
+            logger_->log_debug("processQueue commit took %llu ms",
+                               std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+            pBookmark_->saveBookmarkXml(bookmarkXml);
+
+            if (session.outgoingConnectionsFull("success")) {
+              return;
+            }
+
+            commitAndSaveBookmark = false;
+          }
+        }
+      }
+
+      hasEvent = true;
+    }
+
+    if (commitAndSaveBookmark) {
+      session.commit();
 
 Review comment:
   Removed outer loop since fixed a bug (should be used handle from last saved xml) `pBookmark_->getBookmarkHandleFromXML())`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385564100
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386514966
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
 
 Review comment:
   `auto` for class members allowed for `static` only.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386381465
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
 
 Review comment:
   Since the scope is local and we want to be about to reliably print the duration as milliseconds, I suggest using `uint64_t` instead of `auto` for the return type and avoid casting at the call sites.
   
   I can also live with wrapping the call sites in `uint64_t{ timeDiff() }`, but I prefer an explicit return type for the sake of easy logging.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383784238
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
+
+    // Enumerate the events in the result set after the bookmarked event.
+    bool commitAndSaveBookmark{};
+    std::wstring bookmarkXml;
+    while (true) {
+      EVT_HANDLE hEvent{};
+      DWORD dwReturned{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        DWORD status = ERROR_SUCCESS;
+        if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+          logger_->log_error("!EvtNext error %d.", status);
+        }
+        break;
+      }
+      const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+      commitAndSaveBookmark = false;
+      EventRender renderedData;
+      if (processEvent(hEvent, renderedData)) {
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
+          commitAndSaveBookmark = true;
+
+          eventCount++;
+          processEventRender(renderedData, session);
+
+          if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+            auto before_commit = std::chrono::high_resolution_clock::now();
+            session.commit();
+            logger_->log_debug("processQueue commit took %llu ms",
+                               std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+            pBookmark_->saveBookmarkXml(bookmarkXml);
+
+            if (session.outgoingConnectionsFull("success")) {
+              return;
+            }
+
+            commitAndSaveBookmark = false;
+          }
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383786321
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383300357
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +467,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent, EventRender& renderedData) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
-      }
-      break;
-    }
-
-    processEvent(hEvent);
-
-    EvtClose(hEvent);
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+    logger_->log_error("!EvtRender error: %d.", GetLastError());
+    return false;
   }
 
-  return true;
-}
-
+  std::string xml = wel::to_string(&buf[0]);
 
-bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
-  context->getProperty(Channel.getName(), channel_);
-  context->getProperty(Query.getName(), query_);
+  pugi::xml_document doc;
+  pugi::xml_parse_result result = doc.load_string(xml.c_str());
 
-  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
-  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
-
-  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
-
-  std::string strInactiveDurationToReconnect;
-  context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
-
-  // Get 'inactiveDurationToReconnect_'.
-  core::TimeUnit unit;
-  if (core::Property::StringToTime(strInactiveDurationToReconnect, inactiveDurationToReconnect_, unit) &&
-    core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit, inactiveDurationToReconnect_)) {
-    logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
-  }
-
-  if (!pBookmark_) {
-    logger_->log_error("!pBookmark_");
+  if (!result) {
+    logger_->log_error("Invalid XML produced");
     return false;
   }
 
-  auto channel = std::wstring(channel_.begin(), channel_.end());
-  auto query = std::wstring(query_.begin(), query_.end());
+  // this is a well known path. 
+  std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
+  wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
 
-  do {
-    auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
-    if (!hEventResults) {
-      logger_->log_error("!EvtQuery error: %d.", GetLastError());
-      // Consider it as a serious error.
-      return false;
-    }
-    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+  // resolve the event metadata
+  doc.traverse(walker);
 
-    if (pBookmark_->hasBookmarkXml()) {
-      if (!processEventsAfterBookmark(hEventResults, channel, query)) {
-        break;
-      }
-    } else {
-      // Seek to the last event in the hEventResults.
-      if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
-        logger_->log_error("!EvtSeek error: %d.", GetLastError());
-        break;
-      }
+  if (writePlainText_) {
+    auto handler = getEventLogHandler(providerName);
+    auto message = handler.getEventMessage(hEvent);
 
-      DWORD dwReturned{};
-      EVT_HANDLE hEvent{};
-      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-        logger_->log_error("!EvtNext error: %d.", GetLastError());
-        break;
+    if (!message.empty()) {
+
+      for (const auto &mapEntry : walker.getIdentifiers()) {
+        // replace the identifiers with their translated strings.
+        utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
       }
+      wel::WindowsEventLogHeader log_header(header_names_);
+      // set the delimiter
+      log_header.setDelimiter(header_delimiter_);
+      // render the header.
+      renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+      renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
+      renderedData.rendered_text_ += message;
+    }
+  }
 
-      pBookmark_->saveBookmark(hEvent);
+  if (writeXML_) {
+    substituteXMLPercentageItems(doc);
+
+    if (resolve_as_attributes_) {
+      renderedData.matched_fields_ = walker.getFieldValues();
     }
-  } while (false);
-
-  subscriptionHandle_ = EvtSubscribe(
-      NULL,
-      NULL,
-      channel.c_str(),
-      query.c_str(),
-      NULL,
-      this,
-      [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
-      {
-        auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
-
-        auto& logger = pConsumeWindowsEventLog->logger_;
-
-        if (action == EvtSubscribeActionError) {
-          if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
-            logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
-          } else {
-            logger->log_error("Received the following Win32 error: %x", hEvent);
-          }
-        } else if (action == EvtSubscribeActionDeliver) {
-          pConsumeWindowsEventLog->processEvent(hEvent);
-        }
 
-        return 0UL;
-      },
-      EvtSubscribeToFutureEvents | EvtSubscribeStrict);
+    wel::XmlString writer;
+    doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+    xml = writer.xml_;
 
-  if (!subscriptionHandle_) {
-    logger_->log_error("Unable to subscribe with provided parameters, received the following error code: %d", GetLastError());
-    return false;
+    renderedData.text_ = std::move(xml);
   }
 
-  lastActivityTimestamp_ = GetTickCount64();
-
   return true;
 }
 
-void ConsumeWindowsEventLog::unsubscribe()
-{
-  if (subscriptionHandle_) {
-    EvtClose(subscriptionHandle_);
-    subscriptionHandle_ = 0;
-  }
-}
-
-int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSession> &session)
+void ConsumeWindowsEventLog::processEventRender(const EventRender& renderedData, core::ProcessSession& session)
 {
-  struct WriteCallback: public OutputStreamCallback {
+  struct WriteCallback : public OutputStreamCallback {
     WriteCallback(const std::string& str)
-      : data_(str.c_str()), size_(str.length()) {
+      : str_(str.c_str()) {
 
 Review comment:
   I think this shouldn't work. Since `str_` became a reference and we initialize it with a `const char*`, this should create a temporary with a copy of `str`, make `str_` refer to this temporary, and then destroy the temporary after the initialization, leaving us with a dangling reference.
   
   This should be `str_(str)`, so that we bind `str_` to the same object as `str` refers to.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383266727
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
 
 Review comment:
   I'd only use `std::chrono::high_resolution_clock` for measuring intervals if `std::chrono::high_resolution_clock::is_steady`. Since it's not guaranteed, I suggest using `std::chrono::steady_clock` instead.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385386846
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
 
 Review comment:
   For `uint64_t`, the `PRIu64` macro should be used from `<cinttypes>`, since nothing guarantees that it's a typedef to `unsigned long long int`. On my machine, it's a typedef to `unsigned long int` and I get warnings for those format strings.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386522981
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRIu64 " ms", (uint64_t)timeDiff());
 
 Review comment:
   We need cast because of `narrowing conversion` error.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385564680
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      return false;
+    }
+
+    return true;
+  };
 
-  const auto now = GetTickCount64();
+  size_t eventCount = 0;
+  const auto before_time = std::chrono::high_resolution_clock::now();
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %d Events in %llu ms",
+                       eventCount,
+                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before_time).count());
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383786009
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.h
 ##########
 @@ -101,14 +100,12 @@ class ConsumeWindowsEventLog : public core::Processor
   
 
 protected:
-  bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
-  void unsubscribe();
-  int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+  void processEventsAfterBookmark(core::ProcessSession& session);
+  void processEventRender(const EventRender& renderedData, core::ProcessSession& session);
   wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
   void LogWindowsError();
-  void processEvent(EVT_HANDLE eventHandle);
-  bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
+  bool processEvent(EVT_HANDLE eventHandle, EventRender& renderedData);
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid closed pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid closed pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385725765
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,18 +280,83 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::steady_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before_commit).count());
 
 Review comment:
   There's already a thread about `%llu`, but this case is somewhat different. Here, the standard doesn't specify the returned type, but it specifies the minimum number of bits.
   I suggest assigning the result of `milliseconds::count` to a `uint64_t` (where it will definitely fit), then printing it as a normal `uint64_t`, i.e. with the `"%" PRIu64` format specifier
   
   (In both cases)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385073999
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -191,23 +192,14 @@ bool ConsumeWindowsEventLog::insertHeaderName(wel::METADATA_NAMES &header, const
 }
 
 void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  sessionFactory_ = sessionFactory;
 
 Review comment:
   `sessionFactory_` is not used anymore, it should be removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385564315
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385894354
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,18 +280,83 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::steady_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before_commit).count());
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385564528
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
-      }
-      break;
-    }
-
-    processEvent(hEvent);
-
-    EvtClose(hEvent);
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+    logger_->log_error("!EvtRender error: %d.", GetLastError());
 
 Review comment:
   Fixed by using %x.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386302485
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
 
 Review comment:
   @amarmer the `%x` is still not replaced, please see my previous comment here about why it is necessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386539946
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
 
 Review comment:
   This suggests that the underlying type of `milliseconds` is a signed type and a signed -> unsigned conversion is narrowing. Narrowing conversions are not allowed with direct/copy list-initialization.
   
   I was wrong when suggesting `uint64_t`, because the underlying type is a signed integer type. Try `int64_t`, that will most likely work. The standard only specifies "signed integer type of at least 45 bits". https://en.cppreference.com/w/cpp/chrono/duration
   Please keep the list initialization to prevent lossy conversion in the future if the underlying type changed to a wider one, like `int128_t`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383333144
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.h
 ##########
 @@ -101,14 +100,12 @@ class ConsumeWindowsEventLog : public core::Processor
   
 
 protected:
-  bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
-  void unsubscribe();
-  int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+  void processEventsAfterBookmark(core::ProcessSession& session);
+  void processEventRender(const EventRender& renderedData, core::ProcessSession& session);
   wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
   void LogWindowsError();
-  void processEvent(EVT_HANDLE eventHandle);
-  bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
+  bool processEvent(EVT_HANDLE eventHandle, EventRender& renderedData);
 
 Review comment:
   This member function deserves a Doxygen-style comment describing the behavior and the meaning of the return value, since it's not obvious by looking at the signature.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385564023
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -191,23 +192,14 @@ bool ConsumeWindowsEventLog::insertHeaderName(wel::METADATA_NAMES &header, const
 }
 
 void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  sessionFactory_ = sessionFactory;
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385103450
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
 
 Review comment:
   A debug level log here would really help in debugging.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385138603
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,38 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
 
-    subscribe(context);
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
+
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
 
 Review comment:
   `maxBufferSize_` is an uint64_t, `%llu` should be used.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383252704
 
 

 ##########
 File path: extensions/windows-event-log/Bookmark.cpp
 ##########
 @@ -205,7 +206,7 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
   // '!' should be at the end of bookmark.
   auto pos = bookmarkXml.find(L'!');
   if (std::wstring::npos == pos) {
-    logger_->log_error("No '!' in bookmarXml '%s'", bookmarkXml.c_str());
+    logger_->log_error("No '!' in bookmarXml '%ws'", bookmarkXml.c_str());
 
 Review comment:
   I appreciate the intention, but there is no `w` conversion specifier for [`printf` ](https://pubs.opengroup.org/onlinepubs/009695399/functions/fprintf.html). Use `%ls` for `wchar_t*` wide c strings.
   
   Even though spdlog uses cppformat (nowadays called {fmt}/`std::format`) internally, its docs reference the printf man page for format string specification, so the results are not documented in case of an invalid format string. (Probably a `runtime_error` thrown.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383786218
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +467,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent, EventRender& renderedData) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
-      }
-      break;
-    }
-
-    processEvent(hEvent);
-
-    EvtClose(hEvent);
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+    logger_->log_error("!EvtRender error: %d.", GetLastError());
+    return false;
   }
 
-  return true;
-}
-
+  std::string xml = wel::to_string(&buf[0]);
 
-bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
-  context->getProperty(Channel.getName(), channel_);
-  context->getProperty(Query.getName(), query_);
+  pugi::xml_document doc;
+  pugi::xml_parse_result result = doc.load_string(xml.c_str());
 
-  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
-  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
-
-  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
-
-  std::string strInactiveDurationToReconnect;
-  context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
-
-  // Get 'inactiveDurationToReconnect_'.
-  core::TimeUnit unit;
-  if (core::Property::StringToTime(strInactiveDurationToReconnect, inactiveDurationToReconnect_, unit) &&
-    core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit, inactiveDurationToReconnect_)) {
-    logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
-  }
-
-  if (!pBookmark_) {
-    logger_->log_error("!pBookmark_");
+  if (!result) {
+    logger_->log_error("Invalid XML produced");
     return false;
   }
 
-  auto channel = std::wstring(channel_.begin(), channel_.end());
-  auto query = std::wstring(query_.begin(), query_.end());
+  // this is a well known path. 
+  std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
+  wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
 
-  do {
-    auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
-    if (!hEventResults) {
-      logger_->log_error("!EvtQuery error: %d.", GetLastError());
-      // Consider it as a serious error.
-      return false;
-    }
-    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+  // resolve the event metadata
+  doc.traverse(walker);
 
-    if (pBookmark_->hasBookmarkXml()) {
-      if (!processEventsAfterBookmark(hEventResults, channel, query)) {
-        break;
-      }
-    } else {
-      // Seek to the last event in the hEventResults.
-      if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
-        logger_->log_error("!EvtSeek error: %d.", GetLastError());
-        break;
-      }
+  if (writePlainText_) {
+    auto handler = getEventLogHandler(providerName);
+    auto message = handler.getEventMessage(hEvent);
 
-      DWORD dwReturned{};
-      EVT_HANDLE hEvent{};
-      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-        logger_->log_error("!EvtNext error: %d.", GetLastError());
-        break;
+    if (!message.empty()) {
+
+      for (const auto &mapEntry : walker.getIdentifiers()) {
+        // replace the identifiers with their translated strings.
+        utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
       }
+      wel::WindowsEventLogHeader log_header(header_names_);
+      // set the delimiter
+      log_header.setDelimiter(header_delimiter_);
+      // render the header.
+      renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+      renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
+      renderedData.rendered_text_ += message;
+    }
+  }
 
-      pBookmark_->saveBookmark(hEvent);
+  if (writeXML_) {
+    substituteXMLPercentageItems(doc);
+
+    if (resolve_as_attributes_) {
+      renderedData.matched_fields_ = walker.getFieldValues();
     }
-  } while (false);
-
-  subscriptionHandle_ = EvtSubscribe(
-      NULL,
-      NULL,
-      channel.c_str(),
-      query.c_str(),
-      NULL,
-      this,
-      [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
-      {
-        auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
-
-        auto& logger = pConsumeWindowsEventLog->logger_;
-
-        if (action == EvtSubscribeActionError) {
-          if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
-            logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
-          } else {
-            logger->log_error("Received the following Win32 error: %x", hEvent);
-          }
-        } else if (action == EvtSubscribeActionDeliver) {
-          pConsumeWindowsEventLog->processEvent(hEvent);
-        }
 
-        return 0UL;
-      },
-      EvtSubscribeToFutureEvents | EvtSubscribeStrict);
+    wel::XmlString writer;
+    doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+    xml = writer.xml_;
 
-  if (!subscriptionHandle_) {
-    logger_->log_error("Unable to subscribe with provided parameters, received the following error code: %d", GetLastError());
-    return false;
+    renderedData.text_ = std::move(xml);
   }
 
-  lastActivityTimestamp_ = GetTickCount64();
-
   return true;
 }
 
-void ConsumeWindowsEventLog::unsubscribe()
-{
-  if (subscriptionHandle_) {
-    EvtClose(subscriptionHandle_);
-    subscriptionHandle_ = 0;
-  }
-}
-
-int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSession> &session)
+void ConsumeWindowsEventLog::processEventRender(const EventRender& renderedData, core::ProcessSession& session)
 {
-  struct WriteCallback: public OutputStreamCallback {
+  struct WriteCallback : public OutputStreamCallback {
     WriteCallback(const std::string& str)
-      : data_(str.c_str()), size_(str.length()) {
+      : str_(str.c_str()) {
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385564172
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,38 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
 
-    subscribe(context);
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
+
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386412146
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRIu64 " ms", (uint64_t)timeDiff());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      logger_->log_debug("outgoingConnectionsFull");
+      return false;
+    }
+
+    return true;
+  };
+
+  size_t eventCount = 0;
+  const TimeDiff timeDiff;
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %zu Events in %"  PRIu64 " ms", eventCount, (uint64_t)timeDiff());
+  });
 
-  const auto now = GetTickCount64();
+  size_t commitAndSaveBookmarkCount = 0;
+  std::wstring bookmarkXml;
 
-  if (flowFileCount > 0) {
-    lastActivityTimestamp_ = now;
+  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!hEventResults) {
+    LOG_LAST_ERROR(EvtQuery);
+    return;
+  }
+  const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
+  if (!hBookmark) {
+    // Unrecovarable error.
+    pBookmark_.reset();
+    return;
+  }
+
+  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+    LOG_LAST_ERROR(EvtSeek);
+    return;
   }
-  else if (inactiveDurationToReconnect_ > 0) {
-    if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
-      logger_->log_info("Exceeds configured 'inactive duration to reconnect' %lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
-      unsubscribe();
+
+  // Enumerate the events in the result set after the bookmarked event.
+  while (true) {
+    EVT_HANDLE hEvent{};
+    DWORD dwReturned{};
+    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LOG_LAST_ERROR(EvtNext);
+      }
+      break;
+    }
+    const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+    EventRender eventRender;
+    std::wstring newBookmarkXml;
+    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
+      bookmarkXml = std::move(newBookmarkXml);
+      eventCount++;
+      putEventRenderFlowFileToSession(eventRender, *session);
+
+      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+        if (!commitAndSaveBookmark(bookmarkXml)) {
+          return;
+        }
+
+        commitAndSaveBookmarkCount = eventCount;
+      }
     }
   }
+
+  if (eventCount > commitAndSaveBookmarkCount) {
+    commitAndSaveBookmark(bookmarkXml);
 
 Review comment:
   @szaszm If `batch_commit_size_` is not 0, we must commit once every batch size, that's the point of the feature, and it is an important performance consideration (the uncommitted flow files would otherwise just take up more and more memory and the flow would be blocked, as further processors can't work on uncommitted flow files).
   This particular code is to make sure we safely commit the rest of the flow files (or all of the flow files, if `batch_commit_size_ == 0U`) before saving the bookmark. This is required, because committing can throw an exception, causing us to roll back, but since we would already have saved the bookmark, we would skip these events, causing data loss.
   An another commit will automatically occur when we return from onTrigger, but since there will be no flow files to commit, that will not have much performance impact.
   
   We can change to framework in the future to make it possible to disable autocommit, but until then the tradeoff between having an empty commit and not losing data necessitates this.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385137833
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
-      }
-      break;
-    }
-
-    processEvent(hEvent);
-
-    EvtClose(hEvent);
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+    logger_->log_error("!EvtRender error: %d.", GetLastError());
 
 Review comment:
   `GetLastError` returns a DWORD, the format string should be `%u`, not `%d`.
   There are many instances of this in the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385138933
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      return false;
+    }
+
+    return true;
+  };
 
-  const auto now = GetTickCount64();
+  size_t eventCount = 0;
+  const auto before_time = std::chrono::high_resolution_clock::now();
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %d Events in %llu ms",
 
 Review comment:
   `eventCount` is a size_t, `%zu` should be used, not `%d`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383485294
 
 

 ##########
 File path: extensions/windows-event-log/Bookmark.cpp
 ##########
 @@ -205,7 +206,7 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
   // '!' should be at the end of bookmark.
   auto pos = bookmarkXml.find(L'!');
   if (std::wstring::npos == pos) {
-    logger_->log_error("No '!' in bookmarXml '%s'", bookmarkXml.c_str());
+    logger_->log_error("No '!' in bookmarXml '%ws'", bookmarkXml.c_str());
 
 Review comment:
   According to https://devblogs.microsoft.com/oldnewthing/20190830-00/?p=102823 it works in Windows. I'll change to comply with `C standard`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386380094
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
 
 Review comment:
   Use `auto` instead of `decltype(std::chrono::steady_clock::now())` for simplicity. They are equivalent in this case, since we work with values, not references.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383783987
 
 

 ##########
 File path: extensions/windows-event-log/Bookmark.cpp
 ##########
 @@ -205,7 +206,7 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
   // '!' should be at the end of bookmark.
   auto pos = bookmarkXml.find(L'!');
   if (std::wstring::npos == pos) {
-    logger_->log_error("No '!' in bookmarXml '%s'", bookmarkXml.c_str());
+    logger_->log_error("No '!' in bookmarXml '%ws'", bookmarkXml.c_str());
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383281807
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
+
+    // Enumerate the events in the result set after the bookmarked event.
+    bool commitAndSaveBookmark{};
+    std::wstring bookmarkXml;
+    while (true) {
+      EVT_HANDLE hEvent{};
+      DWORD dwReturned{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        DWORD status = ERROR_SUCCESS;
+        if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+          logger_->log_error("!EvtNext error %d.", status);
+        }
+        break;
+      }
+      const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+      commitAndSaveBookmark = false;
+      EventRender renderedData;
+      if (processEvent(hEvent, renderedData)) {
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
+          commitAndSaveBookmark = true;
+
+          eventCount++;
+          processEventRender(renderedData, session);
+
+          if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+            auto before_commit = std::chrono::high_resolution_clock::now();
+            session.commit();
+            logger_->log_debug("processQueue commit took %llu ms",
+                               std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+            pBookmark_->saveBookmarkXml(bookmarkXml);
+
+            if (session.outgoingConnectionsFull("success")) {
+              return;
+            }
+
+            commitAndSaveBookmark = false;
+          }
 
 Review comment:
   I suggest extracting the commit logic to avoid code duplication. This way the final commit would implicitly also be measured and its duration logged.
   Something like:
   ```
   if (batch_commit_size_ != 0 && eventCount % batch_commit_size_ == 0) {
     const auto success_is_full = this->commit(session, bookmarkXml);
     if (success_is_full) { return; }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383920888
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.h
 ##########
 @@ -101,14 +100,11 @@ class ConsumeWindowsEventLog : public core::Processor
   
 
 protected:
-  bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
-  void unsubscribe();
-  int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+  void putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session);
   wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
   void LogWindowsError();
-  void processEvent(EVT_HANDLE eventHandle);
-  bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
+  bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
 
 Review comment:
   A small suggestion: "Throw an exception to signal that a function can’t perform its assigned task" (instead of returning success/failure `bool`)
   This way `eventRender` can become the return value, making the function more intuitive to use.
   
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#e2-throw-an-exception-to-signal-that-a-function-cant-perform-its-assigned-task
   
   https://stlab.cc/tips/stop-using-out-arguments.html (I wish we had `std::optional`)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386555753
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRIu64 " ms", (uint64_t)timeDiff());
 
 Review comment:
   Shouldn't be the case since you've changed the function to return `int64_t`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385565232
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.h
 ##########
 @@ -101,14 +100,11 @@ class ConsumeWindowsEventLog : public core::Processor
   
 
 protected:
-  bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
-  void unsubscribe();
-  int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+  void putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session);
   wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
   void LogWindowsError();
-  void processEvent(EVT_HANDLE eventHandle);
-  bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
+  bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
 
 Review comment:
   Agree, but since it is minor will leave it as it is. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386386213
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRIu64 " ms", (uint64_t)timeDiff());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      logger_->log_debug("outgoingConnectionsFull");
+      return false;
+    }
+
+    return true;
+  };
+
+  size_t eventCount = 0;
+  const TimeDiff timeDiff;
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %zu Events in %"  PRIu64 " ms", eventCount, (uint64_t)timeDiff());
+  });
 
-  const auto now = GetTickCount64();
+  size_t commitAndSaveBookmarkCount = 0;
+  std::wstring bookmarkXml;
 
-  if (flowFileCount > 0) {
-    lastActivityTimestamp_ = now;
+  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!hEventResults) {
+    LOG_LAST_ERROR(EvtQuery);
+    return;
+  }
+  const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
+  if (!hBookmark) {
+    // Unrecovarable error.
+    pBookmark_.reset();
+    return;
+  }
+
+  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+    LOG_LAST_ERROR(EvtSeek);
+    return;
   }
-  else if (inactiveDurationToReconnect_ > 0) {
-    if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
-      logger_->log_info("Exceeds configured 'inactive duration to reconnect' %lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
-      unsubscribe();
+
+  // Enumerate the events in the result set after the bookmarked event.
+  while (true) {
+    EVT_HANDLE hEvent{};
+    DWORD dwReturned{};
+    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LOG_LAST_ERROR(EvtNext);
+      }
+      break;
+    }
+    const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+    EventRender eventRender;
+    std::wstring newBookmarkXml;
+    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
+      bookmarkXml = std::move(newBookmarkXml);
+      eventCount++;
+      putEventRenderFlowFileToSession(eventRender, *session);
+
+      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+        if (!commitAndSaveBookmark(bookmarkXml)) {
+          return;
+        }
+
+        commitAndSaveBookmarkCount = eventCount;
+      }
     }
   }
+
+  if (eventCount > commitAndSaveBookmarkCount) {
+    commitAndSaveBookmark(bookmarkXml);
 
 Review comment:
   Would it be possible to commit only once every `onTrigger()`, for performance reasons? I had a thread open about this but it was closed with a seemingly unrelated note.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386509949
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
 
 Review comment:
   When using `uint64_t{ timeDiff() }` - `narrowing conversion` error, but returning `uint64_t` instead of `auto` - no error, seems the same.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386303469
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
-      }
-      break;
-    }
-
-    processEvent(hEvent);
-
-    EvtClose(hEvent);
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+    logger_->log_error("!EvtRender error: %d.", GetLastError());
 
 Review comment:
   @amarmer `Bookmark.cpp` is still full of it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385160801
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      return false;
+    }
+
+    return true;
+  };
 
-  const auto now = GetTickCount64();
+  size_t eventCount = 0;
+  const auto before_time = std::chrono::high_resolution_clock::now();
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %d Events in %llu ms",
+                       eventCount,
+                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before_time).count());
 
 Review comment:
   @am-c-p-p you have to changed this single instance from `high_resolution_clock` to `steady_clock`. Subtracting a time point created with one type clock from a time point created by an another is a very bad idea.
   Please change *all* instances of `high_resolution_clock` to `steady_clock`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383741229
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
 
 Review comment:
   There is only common code `EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath)`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383268546
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
+
+    // Enumerate the events in the result set after the bookmarked event.
+    bool commitAndSaveBookmark{};
+    std::wstring bookmarkXml;
+    while (true) {
+      EVT_HANDLE hEvent{};
+      DWORD dwReturned{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        DWORD status = ERROR_SUCCESS;
+        if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+          logger_->log_error("!EvtNext error %d.", status);
+        }
+        break;
+      }
+      const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+      commitAndSaveBookmark = false;
+      EventRender renderedData;
+      if (processEvent(hEvent, renderedData)) {
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
 
 Review comment:
   Consider using operator `&&` instead of nested `if`s

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383783881
 
 

 ##########
 File path: extensions/windows-event-log/Bookmark.cpp
 ##########
 @@ -111,8 +112,8 @@ bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
       bookmarkXml = &buf[0];
 
       return true;
-    }
-    else if (ERROR_SUCCESS != (status = GetLastError())) {
+    } 
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383331881
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
+
+    // Enumerate the events in the result set after the bookmarked event.
+    bool commitAndSaveBookmark{};
+    std::wstring bookmarkXml;
+    while (true) {
+      EVT_HANDLE hEvent{};
+      DWORD dwReturned{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        DWORD status = ERROR_SUCCESS;
+        if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+          logger_->log_error("!EvtNext error %d.", status);
+        }
+        break;
+      }
+      const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+      commitAndSaveBookmark = false;
+      EventRender renderedData;
+      if (processEvent(hEvent, renderedData)) {
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
+          commitAndSaveBookmark = true;
+
+          eventCount++;
+          processEventRender(renderedData, session);
+
+          if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+            auto before_commit = std::chrono::high_resolution_clock::now();
+            session.commit();
+            logger_->log_debug("processQueue commit took %llu ms",
+                               std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+            pBookmark_->saveBookmarkXml(bookmarkXml);
+
+            if (session.outgoingConnectionsFull("success")) {
+              return;
+            }
+
+            commitAndSaveBookmark = false;
+          }
+        }
+      }
+
+      hasEvent = true;
+    }
+
+    if (commitAndSaveBookmark) {
+      session.commit();
 
 Review comment:
   Can we postpone `commit()` to outside the outermost loop, running it only once every `onTrigger()` call, unless specified otherwise via `batch_commit_size_`? For performance reasons.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385570840
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
 
 Review comment:
   @szaszm I agree that this is the completely correct solution, but as I said before on an another PR, I don't see much point in selectively using inttypes macros in random parts of the code, when most of it does not use it. We should refactor the code to change every instance to the proper macro and then stick to using them consistently.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383784091
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
+
+    size_t eventCount = 0;
+    auto before_time = std::chrono::high_resolution_clock::now();
+    utils::ScopeGuard timeGuard([&]() {
+      logger_->log_debug("processed %d Events in %llu ms",
+                         eventCount,
+                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+    });
+
+    // Enumerate the events in the result set after the bookmarked event.
+    bool commitAndSaveBookmark{};
+    std::wstring bookmarkXml;
+    while (true) {
+      EVT_HANDLE hEvent{};
+      DWORD dwReturned{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        DWORD status = ERROR_SUCCESS;
+        if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+          logger_->log_error("!EvtNext error %d.", status);
+        }
+        break;
+      }
+      const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+      commitAndSaveBookmark = false;
+      EventRender renderedData;
+      if (processEvent(hEvent, renderedData)) {
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383243827
 
 

 ##########
 File path: extensions/windows-event-log/Bookmark.cpp
 ##########
 @@ -111,8 +112,8 @@ bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
       bookmarkXml = &buf[0];
 
       return true;
-    }
-    else if (ERROR_SUCCESS != (status = GetLastError())) {
+    } 
 
 Review comment:
   whitespace at the end of the line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385564613
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      return false;
+    }
+
+    return true;
+  };
 
-  const auto now = GetTickCount64();
+  size_t eventCount = 0;
+  const auto before_time = std::chrono::high_resolution_clock::now();
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %d Events in %llu ms",
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#issuecomment-592053132
 
 
   Ran some backpressure tests with a CWEL -> S2S to NiFi flow, seems to handle it without significant memory increase and can continue event collection after backpressure subsides.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on issue #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on issue #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#issuecomment-592585920
 
 
   @am-c-p-p please let us know in a comment when you are ready with the review fixes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386509949
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
 
 Review comment:
   What is the difference between `(uint64_t)timeDiff())` and `uint64_t{ timeDiff() }`?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383920888
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.h
 ##########
 @@ -101,14 +100,11 @@ class ConsumeWindowsEventLog : public core::Processor
   
 
 protected:
-  bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
-  void unsubscribe();
-  int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+  void putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session);
   wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
   void LogWindowsError();
-  void processEvent(EVT_HANDLE eventHandle);
-  bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
+  bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
 
 Review comment:
   A small suggestion: "Throw an exception to signal that a function can’t perform its assigned task" (instead of returning success/failure `bool`)
   This way `eventRender` can become the return value, making the function more intuitive to use.
   
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#e2-throw-an-exception-to-signal-that-a-function-cant-perform-its-assigned-task

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385134259
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -391,322 +463,122 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    logger_->log_error("!EvtRender error %d.", GetLastError());
+    return false;
   }
-}
 
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
 
 Review comment:
   `%ll` is an invalid format string, `%` will be printed instead of it, `%llu` should be used, as `maxBufferSize_` is an uint64_t.
   Also, instead of `%x`, `%p` should be user for printing the handle, as `%x` is a 32-bit unsigned integer, so it will shorten the 64-bit HANDLE to 32 bit. (Even if it didn't do that, `%x` won't display the leading zeroes of the pointer, which is weird.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386391223
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRIu64 " ms", (uint64_t)timeDiff());
 
 Review comment:
   Please don't use C-style casts! My suggestion about the explicit return value would make it redundant in this case.
   
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#es48-avoid-casts
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#es49-if-you-must-use-a-cast-use-a-named-cast (note: here, we don't need a cast)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on issue #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on issue #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#issuecomment-593581397
 
 
   New review comments are addressed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386509949
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
 
 Review comment:
   What is there a difference between `(uint64_t)timeDiff())` and `uint64_t{ timeDiff() }`?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385395344
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      return false;
+    }
+
+    return true;
+  };
 
-  const auto now = GetTickCount64();
+  size_t eventCount = 0;
+  const auto before_time = std::chrono::high_resolution_clock::now();
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %d Events in %llu ms",
+                       eventCount,
+                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before_time).count());
 
 Review comment:
   This is not an `idea`, it is omission :-)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] am-c-p-p commented on issue #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
am-c-p-p commented on issue #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#issuecomment-590482390
 
 
   @szaszm Correct, this is the purpose of the changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385725765
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,18 +280,83 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::steady_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before_commit).count());
 
 Review comment:
   There's already a thread about `%llu`, but this case is somewhat different. Here, the standard doesn't specify the returned type, but it specifies the minimum number of bits.
   I suggest assigning the result of `milliseconds::count` to a `uint64_t` (where it will definitely fit), then printing it as a normal `uint64_t`, i.e. with the `"%" PRIu64` format specifier

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r383269381
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -248,21 +239,119 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
+
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-    subscribe(context);
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
+      return;
+    }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
   }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
 }
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+void ConsumeWindowsEventLog::processEventsAfterBookmark(core::ProcessSession& session) {
+  // External loop is used in case if there are new events while the events after bookmark are processed.
+  bool hasEvent{};
+  do {
+    hasEvent = false;
+
+    auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
       return;
     }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+      logger_->log_error("!EvtSeek error: %d.", GetLastError());
+      return;
+    }
 
 Review comment:
   There seems to be some code duplication with `Bookmark`. Can we extract the common code and reuse it without duplication?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi-minifi-cpp] bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.

Posted by GitBox <gi...@apache.org>.
bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r385161165
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +279,84 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml){
+    const auto before_commit = std::chrono::high_resolution_clock::now();
+    session->commit();
+    logger_->log_debug("processQueue commit took %llu ms",
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      return false;
+    }
+
+    return true;
+  };
 
-  const auto now = GetTickCount64();
+  size_t eventCount = 0;
+  const auto before_time = std::chrono::high_resolution_clock::now();
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %d Events in %llu ms",
+                       eventCount,
+                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before_time).count());
 
 Review comment:
   @am-c-p-p you have changed this single instance from `high_resolution_clock` to `steady_clock`. Subtracting a time point created with one type clock from a time point created by an another is a very bad idea.
   Please change *all* instances of `high_resolution_clock` to `steady_clock`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services