You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ba...@apache.org on 2019/11/20 10:37:17 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1085 - Add bookmarking support to CWEL

This is an automated email from the ASF dual-hosted git repository.

bakaid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new b8291ec  MINIFICPP-1085 - Add bookmarking support to CWEL
b8291ec is described below

commit b8291eca4855b0304f7687641a8233b1dfa22bb0
Author: amarmer <am...@gmail.com>
AuthorDate: Tue Nov 19 15:47:16 2019 -0800

    MINIFICPP-1085 - Add bookmarking support to CWEL
    
    Signed-off-by: Daniel Bakai <ba...@apache.org>
    
    This closes #685
---
 extensions/windows-event-log/Bookmark.cpp          | 223 ++++++++++++++++
 extensions/windows-event-log/Bookmark.h            |  56 ++++
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 284 +++++++++++++++------
 .../windows-event-log/ConsumeWindowsEventLog.h     |  17 +-
 4 files changed, 489 insertions(+), 91 deletions(-)

diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
new file mode 100644
index 0000000..ada6d16
--- /dev/null
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -0,0 +1,223 @@
+#include "Bookmark.h"
+
+#include <direct.h>
+
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+Bookmark::Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+  :logger_(logger) {
+  if (!createUUIDDir(bookmarkRootDir, uuid, filePath_))
+    return;
+
+  filePath_ += "Bookmark.txt";
+
+  std::wstring bookmarkXml;
+  if (!getBookmarkXmlFromFile(bookmarkXml)) {
+    return;
+  }
+
+  if (bookmarkXml.empty()) {
+    if (!(hBookmark_ = EvtCreateBookmark(0))) {
+      logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
+      return;
+    }
+
+    hasBookmarkXml_ = false;
+  } else {
+    if (!(hBookmark_ = EvtCreateBookmark(bookmarkXml.c_str()))) {
+      logger_->log_error("!EvtCreateBookmark error: %d bookmarkXml_ '%s'", GetLastError(), bookmarkXml.c_str());
+
+      // BookmarkXml can be corrupted - create hBookmark_, and create empty file. 
+      if (!(hBookmark_ = EvtCreateBookmark(0))) {
+        logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
+        return;
+      }
+
+      hasBookmarkXml_ = false;
+
+      ok_ = createEmptyBookmarkXmlFile();
+
+      return;
+    }
+
+    hasBookmarkXml_ = true;
+  }
+
+  ok_ = true;
+}
+
+Bookmark::~Bookmark() {
+  if (file_.is_open()) {
+    file_.close();
+  }
+
+  if (hBookmark_) {
+    EvtClose(hBookmark_);
+  }
+}
+
+Bookmark::operator bool() const {
+  return ok_;
+}
+  
+bool Bookmark::hasBookmarkXml() const {
+  return hasBookmarkXml_;
+}
+
+EVT_HANDLE Bookmark::bookmarkHandle() const {
+  return hBookmark_;
+}
+
+bool Bookmark::saveBookmark(EVT_HANDLE hEvent)
+{
+  std::wstring bookmarkXml;
+  if (!getNewBookmarkXml(hEvent, bookmarkXml)) {
+    return false;
+  }
+
+  saveBookmarkXml(bookmarkXml);
+
+  return true;
+}
+
+bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
+  if (!EvtUpdateBookmark(hBookmark_, hEvent)) {
+    logger_->log_error("!EvtUpdateBookmark error: %d.", GetLastError());
+    return false;
+  }
+
+  // Render the bookmark as an XML string that can be persisted.
+  DWORD bufferSize{};
+  DWORD bufferUsed{};
+  DWORD propertyCount{};
+  if (!EvtRender(0, hBookmark_, EvtRenderBookmark, bufferSize, 0, &bufferUsed, &propertyCount)) {
+    DWORD status = ERROR_SUCCESS;
+    if (ERROR_INSUFFICIENT_BUFFER == (status = GetLastError())) {
+      bufferSize = bufferUsed;
+
+      std::vector<wchar_t> buf(bufferSize / 2 + 1);
+
+      if (!EvtRender(0, hBookmark_, EvtRenderBookmark, bufferSize, &buf[0], &bufferUsed, &propertyCount)) {
+        logger_->log_error("!EvtRender error: %d.", GetLastError());
+        return false;
+      }
+
+      bookmarkXml = &buf[0];
+
+      return true;
+    }
+    else if (ERROR_SUCCESS != (status = GetLastError())) {
+      logger_->log_error("!EvtRender error: %d.", GetLastError());
+      return false;
+    }
+  }
+
+  return false;
+}
+
+void Bookmark::saveBookmarkXml(std::wstring& bookmarkXml) {
+  // Write new bookmark over old and in the end write '!'. Then new bookmark is read until '!'. This is faster than truncate.
+  file_.seekp(std::ios::beg);
+
+  file_ << bookmarkXml << L'!';
+
+  file_.flush();
+}
+
+
+bool Bookmark::createEmptyBookmarkXmlFile() {
+  if (file_.is_open()) {
+    file_.close();
+  }
+
+  file_.open(filePath_, std::ios::out);
+  if (!file_.is_open()) {
+    logger_->log_error("Cannot open %s", filePath_.c_str());
+    return false;
+  }
+
+  return true;
+}
+
+bool Bookmark::createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir)
+{
+  if (bookmarkRootDir.empty()) {
+    dir.clear();
+    return false;
+  }
+
+  auto dirWithBackslash = bookmarkRootDir;
+  if (bookmarkRootDir.back() != '\\') {
+    dirWithBackslash += '\\';
+  }
+  
+  dir = dirWithBackslash + "uuid\\" + uuid + "\\";
+
+  utils::file::FileUtils::create_dir(dir);
+
+  auto dirCreated = utils::file::FileUtils::is_directory(dir.c_str());
+  if (!dirCreated) {
+    logger_->log_error("Cannot create %s", dir.c_str());
+    dir.clear();
+  }
+
+  return dirCreated;
+}
+
+bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
+  bookmarkXml.clear();
+
+  std::wifstream file(filePath_);
+  if (!file.is_open()) {
+    return createEmptyBookmarkXmlFile();
+  }
+
+  // Generically is not efficient, but bookmarkXML is small ~100 bytes. 
+  wchar_t c;
+  do {
+    file.read(&c, 1);
+    if (!file) {
+      break;
+    }
+
+    bookmarkXml += c;
+  } while (true);
+
+  file.close();
+
+  file_.open(filePath_);
+  if (!file_.is_open()) {
+    logger_->log_error("Cannot open %s", filePath_.c_str());
+    bookmarkXml.clear();
+    return false;
+  }
+
+  if (bookmarkXml.empty()) {
+    return true;
+  }
+
+  // '!' should be at the end of bookmark.
+  auto pos = bookmarkXml.find(L'!');
+  if (std::wstring::npos == pos) {
+    logger_->log_error("No '!' in bookmarXml '%s'", bookmarkXml.c_str());
+    bookmarkXml.clear();
+    return createEmptyBookmarkXmlFile();
+  }
+
+  // Remove '!'.
+  bookmarkXml.resize(pos);
+
+  return true;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
new file mode 100644
index 0000000..4435734
--- /dev/null
+++ b/extensions/windows-event-log/Bookmark.h
@@ -0,0 +1,56 @@
+#pragma once
+
+#include <string>
+#include <memory>
+#include <windows.h>
+#include <winevt.h>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class Bookmark
+{
+public:
+  Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger);
+  ~Bookmark();
+
+  operator bool() const;
+  
+  bool hasBookmarkXml() const;
+
+  EVT_HANDLE bookmarkHandle() const;
+
+  bool saveBookmark(EVT_HANDLE hEvent);
+
+  bool getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml);
+
+  void saveBookmarkXml(std::wstring& bookmarkXml);
+private:
+  bool createEmptyBookmarkXmlFile();
+
+  bool createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir);
+
+  std::string filePath(const std::string& uuid);
+
+  bool getBookmarkXmlFromFile(std::wstring& bookmarkXml);
+
+private:
+  std::shared_ptr<logging::Logger> logger_;
+  std::string filePath_;
+  bool ok_{};
+  EVT_HANDLE hBookmark_{};
+  std::wfstream file_;
+  bool hasBookmarkXml_{};
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 77f0f90..cb2668c 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -38,7 +38,7 @@
 #include "io/DataStream.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-
+#include "Bookmark.h"
 
 #pragma comment(lib, "wevtapi.lib")
 #pragma comment(lib, "ole32.lib")
@@ -49,6 +49,7 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+// ConsumeWindowsEventLog
 const std::string ConsumeWindowsEventLog::ProcessorName("ConsumeWindowsEventLog");
 
 core::Property ConsumeWindowsEventLog::Channel(
@@ -139,11 +140,17 @@ core::Property ConsumeWindowsEventLog::BatchCommitSize(
   withDescription("Maximum number of Events to consume and create to Flow Files from before committing.")->
   build());
 
+core::Property ConsumeWindowsEventLog::BookmarkRootDirectory(
+  core::PropertyBuilder::createProperty("State Directory")->
+  isRequired(false)->
+  withDefaultValue("CWELState")->
+  withDescription("Directory which contains processor state data.")->
+  build());
+
 core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");
 
 ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
   : core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false), batch_commit_size_(0U) {
-
   char buff[MAX_COMPUTERNAME_LENGTH + 1];
   DWORD size = sizeof(buff);
   if (GetComputerName(buff, &size)) {
@@ -161,7 +168,9 @@ ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
 
 void ConsumeWindowsEventLog::initialize() {
   //! Set the supported properties
-  setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize});
+  setSupportedProperties(
+    {Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize, BookmarkRootDirectory}
+  );
 
   //! Set the supported relationships
   setSupportedRelationships({Success});
@@ -185,6 +194,17 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   context->getProperty(EventHeaderDelimiter.getName(), header_delimiter_);
   context->getProperty(BatchCommitSize.getName(), batch_commit_size_);
 
+  std::string bookmarkDir;
+  context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+  if (bookmarkDir.empty()) {
+    logger_->log_error("State Directory is empty");
+  } else {
+    pBookmark_ = std::make_unique<Bookmark>(bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+    }
+  }
+
   std::string header;
   context->getProperty(EventHeader.getName(), header);
 
@@ -230,6 +250,12 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
     }
   }
 
+  std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+  if (!lock.owns_lock()) {
+    logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
+    return;
+  }
+
   const auto flowFileCount = processQueue(session);
 
   const auto now = GetTickCount64();
@@ -260,17 +286,121 @@ wel::WindowsEventLogHandler ConsumeWindowsEventLog::getEventLogHandler(const std
   return providers_[name];
 } 
 
+void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+  DWORD size = 0;
+  DWORD used = 0;
+  DWORD propertyCount = 0;
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
+    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
+      if (used > maxBufferSize_) {
+        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
+        return;
+      }
+
+      size = used;
+      std::vector<wchar_t> buf(size / 2 + 1);
+      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+        logger_->log_error("!EvtRender error: %d.", GetLastError());
+        return;
+      }
+
+      std::string xml = wel::to_string(&buf[0]);
+
+      pugi::xml_document doc;
+      pugi::xml_parse_result result = doc.load_string(xml.c_str());
+
+      if (!result) {
+        logger_->log_error("Invalid XML produced");
+        return;
+      }
+      // this is a well known path. 
+      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
+      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
+
+      // resolve the event metadata
+      doc.traverse(walker);
+
+      EventRender renderedData;
+
+      if (writePlainText_) {
+        auto handler = getEventLogHandler(providerName);
+        auto message = handler.getEventMessage(hEvent);
+
+        if (!message.empty()) {
+
+          for (const auto &mapEntry : walker.getIdentifiers()) {
+            // replace the identifiers with their translated strings.
+            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
+          }
+          wel::WindowsEventLogHeader log_header(header_names_);
+          // set the delimiter
+          log_header.setDelimiter(header_delimiter_);
+          // render the header.
+          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
+          renderedData.rendered_text_ += message;
+        }
+      }
+
+      if (writeXML_) {
+        if (resolve_as_attributes_) {
+          renderedData.matched_fields_ = walker.getFieldValues();
+        }
+
+        wel::XmlString writer;
+        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+        xml = writer.xml_;
+
+        renderedData.text_ = std::move(xml);
+      }
+
+      if (pBookmark_) {
+        std::wstring bookmarkXml;
+        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
+          renderedData.bookmarkXml_ = bookmarkXml;
+        }
+      }
+
+      listRenderedData_.enqueue(std::move(renderedData));
+    }
+  }
+}
+
+bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
+  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+    logger_->log_error("!EvtSeek error %d.", GetLastError());
+    return false;
+  }
+
+  // Enumerate the events in the result set after the bookmarked event.
+  while (true) {
+    EVT_HANDLE hEvent{};
+    DWORD dwReturned{};
+    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+      DWORD status = ERROR_SUCCESS;
+      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+        logger_->log_error("!EvtNext error %d.", status);
+      }
+      break;
+    }
+
+    processEvent(hEvent);
+
+    EvtClose(hEvent);
+  }
+
+  return true;
+}
+
 
 bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
   context->getProperty(Channel.getName(), channel_);
-
-  std::string query;
-  context->getProperty(Query.getName(), query);
+  context->getProperty(Query.getName(), query_);
 
   context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
   logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
 
-  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
 
   std::string strInactiveDurationToReconnect;
   context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
@@ -282,98 +412,66 @@ bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContex
     logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
   }
 
+  if (!pBookmark_) {
+    logger_->log_error("!pBookmark_");
+    return false;
+  }
+
+  auto channel = std::wstring(channel_.begin(), channel_.end());
+  auto query = std::wstring(query_.begin(), query_.end());
+
+  do {
+    auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
+    if (!hEventResults) {
+      logger_->log_error("!EvtQuery error: %d.", GetLastError());
+      // Consider it as a serious error.
+      return false;
+    }
+    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+    if (pBookmark_->hasBookmarkXml()) {
+      if (!processEventsAfterBookmark(hEventResults, channel, query)) {
+        break;
+      }
+    } else {
+      // Seek to the last event in the hEventResults.
+      if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
+        logger_->log_error("!EvtSeek error: %d.", GetLastError());
+        break;
+      }
+
+      DWORD dwReturned{};
+      EVT_HANDLE hEvent{};
+      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+        logger_->log_error("!EvtNext error: %d.", GetLastError());
+        break;
+      }
+
+      pBookmark_->saveBookmark(hEvent);
+    }
+  } while (false);
+
   subscriptionHandle_ = EvtSubscribe(
       NULL,
       NULL,
-      std::wstring(channel_.begin(), channel_.end()).c_str(),
-      std::wstring(query.begin(), query.end()).c_str(),
+      channel.c_str(),
+      query.c_str(),
       NULL,
       this,
-      [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE eventHandle)
+      [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
       {
-  
         auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
 
         auto& logger = pConsumeWindowsEventLog->logger_;
 
         if (action == EvtSubscribeActionError) {
-          if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)eventHandle) {
+          if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
             logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
           } else {
-            logger->log_error("Received the following Win32 error: %x", eventHandle);
+            logger->log_error("Received the following Win32 error: %x", hEvent);
           }
         } else if (action == EvtSubscribeActionDeliver) {
-          DWORD size = 0;
-          DWORD used = 0;
-          DWORD propertyCount = 0;
-          if (!EvtRender(NULL, eventHandle, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-            if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-              if (used > pConsumeWindowsEventLog->maxBufferSize_) {
-                logger->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", eventHandle, pConsumeWindowsEventLog->maxBufferSize_);
-                return 0UL;
-              }
-
-              size = used;
-              std::vector<wchar_t> buf(size/2 + 1);
-              if (EvtRender(NULL, eventHandle, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-                std::string xml = wel::to_string(&buf[0]);
-
-                EventRender renderedData;
-
-                pugi::xml_document doc;
-                pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-                if (!result) {
-                  logger->log_error("Invalid XML produced");
-                  return 0UL;
-                }
-                // this is a well known path. 
-                std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-                wel::MetadataWalker walker(pConsumeWindowsEventLog->getEventLogHandler(providerName).getMetadata(),
-                    pConsumeWindowsEventLog->channel_, eventHandle, !pConsumeWindowsEventLog->resolve_as_attributes_,
-                    pConsumeWindowsEventLog->apply_identifier_function_, pConsumeWindowsEventLog->regex_);
-
-                // resolve the event metadata
-                doc.traverse(walker);
-
-                if (pConsumeWindowsEventLog->writePlainText_) {
-                  auto handler = pConsumeWindowsEventLog->getEventLogHandler(providerName);
-                  auto message = handler.getEventMessage(eventHandle);
-
-                  if (!message.empty()) {
-
-                    for (const auto &mapEntry : walker.getIdentifiers()) {
-                      // replace the identifiers with their translated strings.
-                      utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-                    }
-                    wel::WindowsEventLogHeader log_header(pConsumeWindowsEventLog->header_names_);
-                    // set the delimiter
-                    log_header.setDelimiter(pConsumeWindowsEventLog->header_delimiter_);
-                    // render the header.
-                    renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-                    renderedData.rendered_text_ += "Message" + pConsumeWindowsEventLog->header_delimiter_ + " ";
-                    renderedData.rendered_text_ += message;
-                  }
-                }
-
-                if (pConsumeWindowsEventLog->writeXML_) {
-                  if (pConsumeWindowsEventLog->resolve_as_attributes_) {
-                    renderedData.matched_fields_ = walker.getFieldValues();
-                  }
-
-                  wel::XmlString writer;
-                  doc.print(writer,"", pugi::format_raw); // no indentation or formatting
-                  xml = writer.xml_;
-
-                  renderedData.text_ = std::move(xml);
-                }
-
-                pConsumeWindowsEventLog->listRenderedData_.enqueue(std::move(renderedData));
-              } else {
-                logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
-              }
-            }
-          }
+          pConsumeWindowsEventLog->processEvent(hEvent);
         }
 
         return 0UL;
@@ -423,8 +521,12 @@ int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSess
                       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
   });
 
+  bool commitAndSaveBookmark = false;
+
   EventRender evt;
   while (listRenderedData_.try_dequeue(evt)) {
+    commitAndSaveBookmark = true;
+
     if (writeXML_) {
       auto flowFile = session->create();
 
@@ -457,6 +559,20 @@ int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSess
       session->commit();
       logger_->log_debug("processQueue commit took %llu ms",
                         std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+      if (pBookmark_) {
+        pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
+      }
+
+      commitAndSaveBookmark = false;
+    }
+  }
+
+  if (commitAndSaveBookmark) {
+    session->commit();
+
+    if (pBookmark_) {
+      pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
     }
   }
 
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index ea8bef1..31ab521 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -33,9 +33,7 @@
 #include <codecvt>
 #include "utils/OsUtils.h"
 #include <Objbase.h>
-
-
-//#import <msxml6.dll>
+#include <mutex>
 
 namespace org {
 namespace apache {
@@ -47,8 +45,11 @@ struct EventRender {
 	std::map<std::string, std::string> matched_fields_;
 	std::string text_;
 	std::string rendered_text_;
+  std::wstring bookmarkXml_;
 };
 
+class Bookmark;
+
 //! ConsumeWindowsEventLog Class
 class ConsumeWindowsEventLog : public core::Processor
 {
@@ -78,6 +79,7 @@ public:
   static core::Property EventHeader;
   static core::Property OutputFormat;
   static core::Property BatchCommitSize;
+  static core::Property BookmarkRootDirectory;
 
   //! Supported Relationships
   static core::Relationship Success;
@@ -101,12 +103,11 @@ protected:
   bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
   void unsubscribe();
   int processQueue(const std::shared_ptr<core::ProcessSession> &session);
-  
   wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
-
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
-
   void LogWindowsError();
+  void processEvent(EVT_HANDLE eventHandle);
+  bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
 
   static constexpr const char * const XML = "XML";
   static constexpr const char * const Both = "Both";
@@ -118,6 +119,7 @@ private:
   wel::METADATA_NAMES header_names_;
   std::string header_delimiter_;
   std::string channel_;
+  std::string query_;
   std::shared_ptr<logging::Logger> logger_;
   std::string regex_;
   bool resolve_as_attributes_;
@@ -132,11 +134,12 @@ private:
   std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
   std::mutex cache_mutex_;
   std::map<std::string, wel::WindowsEventLogHandler > providers_;
-
   uint64_t batch_commit_size_;
 
   bool writeXML_;
   bool writePlainText_;
+  std::unique_ptr<Bookmark> pBookmark_;
+  std::mutex onTriggerMutex_;
 };
 
 REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.");