You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ba...@apache.org on 2019/11/20 10:37:17 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1085 - Add
bookmarking support to CWEL
This is an automated email from the ASF dual-hosted git repository.
bakaid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new b8291ec MINIFICPP-1085 - Add bookmarking support to CWEL
b8291ec is described below
commit b8291eca4855b0304f7687641a8233b1dfa22bb0
Author: amarmer <am...@gmail.com>
AuthorDate: Tue Nov 19 15:47:16 2019 -0800
MINIFICPP-1085 - Add bookmarking support to CWEL
Signed-off-by: Daniel Bakai <ba...@apache.org>
This closes #685
---
extensions/windows-event-log/Bookmark.cpp | 223 ++++++++++++++++
extensions/windows-event-log/Bookmark.h | 56 ++++
.../windows-event-log/ConsumeWindowsEventLog.cpp | 284 +++++++++++++++------
.../windows-event-log/ConsumeWindowsEventLog.h | 17 +-
4 files changed, 489 insertions(+), 91 deletions(-)
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
new file mode 100644
index 0000000..ada6d16
--- /dev/null
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -0,0 +1,223 @@
+#include "Bookmark.h"
+
+#include <direct.h>
+
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+Bookmark::Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+ :logger_(logger) {
+ if (!createUUIDDir(bookmarkRootDir, uuid, filePath_))
+ return;
+
+ filePath_ += "Bookmark.txt";
+
+ std::wstring bookmarkXml;
+ if (!getBookmarkXmlFromFile(bookmarkXml)) {
+ return;
+ }
+
+ if (bookmarkXml.empty()) {
+ if (!(hBookmark_ = EvtCreateBookmark(0))) {
+ logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
+ return;
+ }
+
+ hasBookmarkXml_ = false;
+ } else {
+ if (!(hBookmark_ = EvtCreateBookmark(bookmarkXml.c_str()))) {
+ logger_->log_error("!EvtCreateBookmark error: %d bookmarkXml_ '%s'", GetLastError(), bookmarkXml.c_str());
+
+ // BookmarkXml can be corrupted - create hBookmark_, and create empty file.
+ if (!(hBookmark_ = EvtCreateBookmark(0))) {
+ logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
+ return;
+ }
+
+ hasBookmarkXml_ = false;
+
+ ok_ = createEmptyBookmarkXmlFile();
+
+ return;
+ }
+
+ hasBookmarkXml_ = true;
+ }
+
+ ok_ = true;
+}
+
+Bookmark::~Bookmark() {
+ if (file_.is_open()) {
+ file_.close();
+ }
+
+ if (hBookmark_) {
+ EvtClose(hBookmark_);
+ }
+}
+
+Bookmark::operator bool() const {
+ return ok_;
+}
+
+bool Bookmark::hasBookmarkXml() const {
+ return hasBookmarkXml_;
+}
+
+EVT_HANDLE Bookmark::bookmarkHandle() const {
+ return hBookmark_;
+}
+
+bool Bookmark::saveBookmark(EVT_HANDLE hEvent)
+{
+ std::wstring bookmarkXml;
+ if (!getNewBookmarkXml(hEvent, bookmarkXml)) {
+ return false;
+ }
+
+ saveBookmarkXml(bookmarkXml);
+
+ return true;
+}
+
+bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
+ if (!EvtUpdateBookmark(hBookmark_, hEvent)) {
+ logger_->log_error("!EvtUpdateBookmark error: %d.", GetLastError());
+ return false;
+ }
+
+ // Render the bookmark as an XML string that can be persisted.
+ DWORD bufferSize{};
+ DWORD bufferUsed{};
+ DWORD propertyCount{};
+ if (!EvtRender(0, hBookmark_, EvtRenderBookmark, bufferSize, 0, &bufferUsed, &propertyCount)) {
+ DWORD status = ERROR_SUCCESS;
+ if (ERROR_INSUFFICIENT_BUFFER == (status = GetLastError())) {
+ bufferSize = bufferUsed;
+
+ std::vector<wchar_t> buf(bufferSize / 2 + 1);
+
+ if (!EvtRender(0, hBookmark_, EvtRenderBookmark, bufferSize, &buf[0], &bufferUsed, &propertyCount)) {
+ logger_->log_error("!EvtRender error: %d.", GetLastError());
+ return false;
+ }
+
+ bookmarkXml = &buf[0];
+
+ return true;
+ }
+ else if (ERROR_SUCCESS != (status = GetLastError())) {
+ logger_->log_error("!EvtRender error: %d.", GetLastError());
+ return false;
+ }
+ }
+
+ return false;
+}
+
+void Bookmark::saveBookmarkXml(std::wstring& bookmarkXml) {
+ // Write new bookmark over old and in the end write '!'. Then new bookmark is read until '!'. This is faster than truncate.
+ file_.seekp(std::ios::beg);
+
+ file_ << bookmarkXml << L'!';
+
+ file_.flush();
+}
+
+
+bool Bookmark::createEmptyBookmarkXmlFile() {
+ if (file_.is_open()) {
+ file_.close();
+ }
+
+ file_.open(filePath_, std::ios::out);
+ if (!file_.is_open()) {
+ logger_->log_error("Cannot open %s", filePath_.c_str());
+ return false;
+ }
+
+ return true;
+}
+
+bool Bookmark::createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir)
+{
+ if (bookmarkRootDir.empty()) {
+ dir.clear();
+ return false;
+ }
+
+ auto dirWithBackslash = bookmarkRootDir;
+ if (bookmarkRootDir.back() != '\\') {
+ dirWithBackslash += '\\';
+ }
+
+ dir = dirWithBackslash + "uuid\\" + uuid + "\\";
+
+ utils::file::FileUtils::create_dir(dir);
+
+ auto dirCreated = utils::file::FileUtils::is_directory(dir.c_str());
+ if (!dirCreated) {
+ logger_->log_error("Cannot create %s", dir.c_str());
+ dir.clear();
+ }
+
+ return dirCreated;
+}
+
+bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
+ bookmarkXml.clear();
+
+ std::wifstream file(filePath_);
+ if (!file.is_open()) {
+ return createEmptyBookmarkXmlFile();
+ }
+
+ // Generically is not efficient, but bookmarkXML is small ~100 bytes.
+ wchar_t c;
+ do {
+ file.read(&c, 1);
+ if (!file) {
+ break;
+ }
+
+ bookmarkXml += c;
+ } while (true);
+
+ file.close();
+
+ file_.open(filePath_);
+ if (!file_.is_open()) {
+ logger_->log_error("Cannot open %s", filePath_.c_str());
+ bookmarkXml.clear();
+ return false;
+ }
+
+ if (bookmarkXml.empty()) {
+ return true;
+ }
+
+ // '!' should be at the end of bookmark.
+ auto pos = bookmarkXml.find(L'!');
+ if (std::wstring::npos == pos) {
+ logger_->log_error("No '!' in bookmarXml '%s'", bookmarkXml.c_str());
+ bookmarkXml.clear();
+ return createEmptyBookmarkXmlFile();
+ }
+
+ // Remove '!'.
+ bookmarkXml.resize(pos);
+
+ return true;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
new file mode 100644
index 0000000..4435734
--- /dev/null
+++ b/extensions/windows-event-log/Bookmark.h
@@ -0,0 +1,56 @@
+#pragma once
+
+#include <string>
+#include <memory>
+#include <windows.h>
+#include <winevt.h>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class Bookmark
+{
+public:
+ Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger);
+ ~Bookmark();
+
+ operator bool() const;
+
+ bool hasBookmarkXml() const;
+
+ EVT_HANDLE bookmarkHandle() const;
+
+ bool saveBookmark(EVT_HANDLE hEvent);
+
+ bool getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml);
+
+ void saveBookmarkXml(std::wstring& bookmarkXml);
+private:
+ bool createEmptyBookmarkXmlFile();
+
+ bool createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir);
+
+ std::string filePath(const std::string& uuid);
+
+ bool getBookmarkXmlFromFile(std::wstring& bookmarkXml);
+
+private:
+ std::shared_ptr<logging::Logger> logger_;
+ std::string filePath_;
+ bool ok_{};
+ EVT_HANDLE hBookmark_{};
+ std::wfstream file_;
+ bool hasBookmarkXml_{};
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 77f0f90..cb2668c 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -38,7 +38,7 @@
#include "io/DataStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
-
+#include "Bookmark.h"
#pragma comment(lib, "wevtapi.lib")
#pragma comment(lib, "ole32.lib")
@@ -49,6 +49,7 @@ namespace nifi {
namespace minifi {
namespace processors {
+// ConsumeWindowsEventLog
const std::string ConsumeWindowsEventLog::ProcessorName("ConsumeWindowsEventLog");
core::Property ConsumeWindowsEventLog::Channel(
@@ -139,11 +140,17 @@ core::Property ConsumeWindowsEventLog::BatchCommitSize(
withDescription("Maximum number of Events to consume and create to Flow Files from before committing.")->
build());
+core::Property ConsumeWindowsEventLog::BookmarkRootDirectory(
+ core::PropertyBuilder::createProperty("State Directory")->
+ isRequired(false)->
+ withDefaultValue("CWELState")->
+ withDescription("Directory which contains processor state data.")->
+ build());
+
core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");
ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
: core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false), batch_commit_size_(0U) {
-
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
if (GetComputerName(buff, &size)) {
@@ -161,7 +168,9 @@ ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
void ConsumeWindowsEventLog::initialize() {
//! Set the supported properties
- setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize});
+ setSupportedProperties(
+ {Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize, BookmarkRootDirectory}
+ );
//! Set the supported relationships
setSupportedRelationships({Success});
@@ -185,6 +194,17 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
context->getProperty(EventHeaderDelimiter.getName(), header_delimiter_);
context->getProperty(BatchCommitSize.getName(), batch_commit_size_);
+ std::string bookmarkDir;
+ context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+ if (bookmarkDir.empty()) {
+ logger_->log_error("State Directory is empty");
+ } else {
+ pBookmark_ = std::make_unique<Bookmark>(bookmarkDir, getUUIDStr(), logger_);
+ if (!*pBookmark_) {
+ pBookmark_.reset();
+ }
+ }
+
std::string header;
context->getProperty(EventHeader.getName(), header);
@@ -230,6 +250,12 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
}
}
+ std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
+ return;
+ }
+
const auto flowFileCount = processQueue(session);
const auto now = GetTickCount64();
@@ -260,17 +286,121 @@ wel::WindowsEventLogHandler ConsumeWindowsEventLog::getEventLogHandler(const std
return providers_[name];
}
+void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+ DWORD size = 0;
+ DWORD used = 0;
+ DWORD propertyCount = 0;
+ if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
+ if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
+ if (used > maxBufferSize_) {
+ logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
+ return;
+ }
+
+ size = used;
+ std::vector<wchar_t> buf(size / 2 + 1);
+ if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+ logger_->log_error("!EvtRender error: %d.", GetLastError());
+ return;
+ }
+
+ std::string xml = wel::to_string(&buf[0]);
+
+ pugi::xml_document doc;
+ pugi::xml_parse_result result = doc.load_string(xml.c_str());
+
+ if (!result) {
+ logger_->log_error("Invalid XML produced");
+ return;
+ }
+ // this is a well known path.
+ std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
+ wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
+
+ // resolve the event metadata
+ doc.traverse(walker);
+
+ EventRender renderedData;
+
+ if (writePlainText_) {
+ auto handler = getEventLogHandler(providerName);
+ auto message = handler.getEventMessage(hEvent);
+
+ if (!message.empty()) {
+
+ for (const auto &mapEntry : walker.getIdentifiers()) {
+ // replace the identifiers with their translated strings.
+ utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
+ }
+ wel::WindowsEventLogHeader log_header(header_names_);
+ // set the delimiter
+ log_header.setDelimiter(header_delimiter_);
+ // render the header.
+ renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+ renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
+ renderedData.rendered_text_ += message;
+ }
+ }
+
+ if (writeXML_) {
+ if (resolve_as_attributes_) {
+ renderedData.matched_fields_ = walker.getFieldValues();
+ }
+
+ wel::XmlString writer;
+ doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+ xml = writer.xml_;
+
+ renderedData.text_ = std::move(xml);
+ }
+
+ if (pBookmark_) {
+ std::wstring bookmarkXml;
+ if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
+ renderedData.bookmarkXml_ = bookmarkXml;
+ }
+ }
+
+ listRenderedData_.enqueue(std::move(renderedData));
+ }
+ }
+}
+
+bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
+ if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
+ logger_->log_error("!EvtSeek error %d.", GetLastError());
+ return false;
+ }
+
+ // Enumerate the events in the result set after the bookmarked event.
+ while (true) {
+ EVT_HANDLE hEvent{};
+ DWORD dwReturned{};
+ if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+ DWORD status = ERROR_SUCCESS;
+ if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
+ logger_->log_error("!EvtNext error %d.", status);
+ }
+ break;
+ }
+
+ processEvent(hEvent);
+
+ EvtClose(hEvent);
+ }
+
+ return true;
+}
+
bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
context->getProperty(Channel.getName(), channel_);
-
- std::string query;
- context->getProperty(Query.getName(), query);
+ context->getProperty(Query.getName(), query_);
context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
- provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
+ provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
std::string strInactiveDurationToReconnect;
context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
@@ -282,98 +412,66 @@ bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContex
logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
}
+ if (!pBookmark_) {
+ logger_->log_error("!pBookmark_");
+ return false;
+ }
+
+ auto channel = std::wstring(channel_.begin(), channel_.end());
+ auto query = std::wstring(query_.begin(), query_.end());
+
+ do {
+ auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
+ if (!hEventResults) {
+ logger_->log_error("!EvtQuery error: %d.", GetLastError());
+ // Consider it as a serious error.
+ return false;
+ }
+ const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+ if (pBookmark_->hasBookmarkXml()) {
+ if (!processEventsAfterBookmark(hEventResults, channel, query)) {
+ break;
+ }
+ } else {
+ // Seek to the last event in the hEventResults.
+ if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
+ logger_->log_error("!EvtSeek error: %d.", GetLastError());
+ break;
+ }
+
+ DWORD dwReturned{};
+ EVT_HANDLE hEvent{};
+ if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+ logger_->log_error("!EvtNext error: %d.", GetLastError());
+ break;
+ }
+
+ pBookmark_->saveBookmark(hEvent);
+ }
+ } while (false);
+
subscriptionHandle_ = EvtSubscribe(
NULL,
NULL,
- std::wstring(channel_.begin(), channel_.end()).c_str(),
- std::wstring(query.begin(), query.end()).c_str(),
+ channel.c_str(),
+ query.c_str(),
NULL,
this,
- [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE eventHandle)
+ [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
{
-
auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
auto& logger = pConsumeWindowsEventLog->logger_;
if (action == EvtSubscribeActionError) {
- if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)eventHandle) {
+ if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
} else {
- logger->log_error("Received the following Win32 error: %x", eventHandle);
+ logger->log_error("Received the following Win32 error: %x", hEvent);
}
} else if (action == EvtSubscribeActionDeliver) {
- DWORD size = 0;
- DWORD used = 0;
- DWORD propertyCount = 0;
- if (!EvtRender(NULL, eventHandle, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
- if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
- if (used > pConsumeWindowsEventLog->maxBufferSize_) {
- logger->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", eventHandle, pConsumeWindowsEventLog->maxBufferSize_);
- return 0UL;
- }
-
- size = used;
- std::vector<wchar_t> buf(size/2 + 1);
- if (EvtRender(NULL, eventHandle, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
- std::string xml = wel::to_string(&buf[0]);
-
- EventRender renderedData;
-
- pugi::xml_document doc;
- pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
- if (!result) {
- logger->log_error("Invalid XML produced");
- return 0UL;
- }
- // this is a well known path.
- std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
- wel::MetadataWalker walker(pConsumeWindowsEventLog->getEventLogHandler(providerName).getMetadata(),
- pConsumeWindowsEventLog->channel_, eventHandle, !pConsumeWindowsEventLog->resolve_as_attributes_,
- pConsumeWindowsEventLog->apply_identifier_function_, pConsumeWindowsEventLog->regex_);
-
- // resolve the event metadata
- doc.traverse(walker);
-
- if (pConsumeWindowsEventLog->writePlainText_) {
- auto handler = pConsumeWindowsEventLog->getEventLogHandler(providerName);
- auto message = handler.getEventMessage(eventHandle);
-
- if (!message.empty()) {
-
- for (const auto &mapEntry : walker.getIdentifiers()) {
- // replace the identifiers with their translated strings.
- utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
- }
- wel::WindowsEventLogHeader log_header(pConsumeWindowsEventLog->header_names_);
- // set the delimiter
- log_header.setDelimiter(pConsumeWindowsEventLog->header_delimiter_);
- // render the header.
- renderedData.rendered_text_ = log_header.getEventHeader(&walker);
- renderedData.rendered_text_ += "Message" + pConsumeWindowsEventLog->header_delimiter_ + " ";
- renderedData.rendered_text_ += message;
- }
- }
-
- if (pConsumeWindowsEventLog->writeXML_) {
- if (pConsumeWindowsEventLog->resolve_as_attributes_) {
- renderedData.matched_fields_ = walker.getFieldValues();
- }
-
- wel::XmlString writer;
- doc.print(writer,"", pugi::format_raw); // no indentation or formatting
- xml = writer.xml_;
-
- renderedData.text_ = std::move(xml);
- }
-
- pConsumeWindowsEventLog->listRenderedData_.enqueue(std::move(renderedData));
- } else {
- logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
- }
- }
- }
+ pConsumeWindowsEventLog->processEvent(hEvent);
}
return 0UL;
@@ -423,8 +521,12 @@ int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSess
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
});
+ bool commitAndSaveBookmark = false;
+
EventRender evt;
while (listRenderedData_.try_dequeue(evt)) {
+ commitAndSaveBookmark = true;
+
if (writeXML_) {
auto flowFile = session->create();
@@ -457,6 +559,20 @@ int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSess
session->commit();
logger_->log_debug("processQueue commit took %llu ms",
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+
+ if (pBookmark_) {
+ pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
+ }
+
+ commitAndSaveBookmark = false;
+ }
+ }
+
+ if (commitAndSaveBookmark) {
+ session->commit();
+
+ if (pBookmark_) {
+ pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
}
}
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index ea8bef1..31ab521 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -33,9 +33,7 @@
#include <codecvt>
#include "utils/OsUtils.h"
#include <Objbase.h>
-
-
-//#import <msxml6.dll>
+#include <mutex>
namespace org {
namespace apache {
@@ -47,8 +45,11 @@ struct EventRender {
std::map<std::string, std::string> matched_fields_;
std::string text_;
std::string rendered_text_;
+ std::wstring bookmarkXml_;
};
+class Bookmark;
+
//! ConsumeWindowsEventLog Class
class ConsumeWindowsEventLog : public core::Processor
{
@@ -78,6 +79,7 @@ public:
static core::Property EventHeader;
static core::Property OutputFormat;
static core::Property BatchCommitSize;
+ static core::Property BookmarkRootDirectory;
//! Supported Relationships
static core::Relationship Success;
@@ -101,12 +103,11 @@ protected:
bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
void unsubscribe();
int processQueue(const std::shared_ptr<core::ProcessSession> &session);
-
wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
-
bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
-
void LogWindowsError();
+ void processEvent(EVT_HANDLE eventHandle);
+ bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
static constexpr const char * const XML = "XML";
static constexpr const char * const Both = "Both";
@@ -118,6 +119,7 @@ private:
wel::METADATA_NAMES header_names_;
std::string header_delimiter_;
std::string channel_;
+ std::string query_;
std::shared_ptr<logging::Logger> logger_;
std::string regex_;
bool resolve_as_attributes_;
@@ -132,11 +134,12 @@ private:
std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
-
uint64_t batch_commit_size_;
bool writeXML_;
bool writePlainText_;
+ std::unique_ptr<Bookmark> pBookmark_;
+ std::mutex onTriggerMutex_;
};
REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.");