You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/06/07 18:17:55 UTC
[nifi-minifi-cpp] branch master updated: minificpp-773 Implemented
Text rendering.
This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 47b0413 minificpp-773 Implemented Text rendering.
47b0413 is described below
commit 47b041396d127ba08e8651a64f39b9c33fa3d593
Author: amarmer <am...@AMARMER-5530-85>
AuthorDate: Tue Jun 4 14:07:00 2019 -0700
minificpp-773 Implemented Text rendering.
This closes #584.
Signed-off-by: Marc Parisi <ph...@apache.org>
---
.../windows-event-log/ConsumeWindowsEventLog.cpp | 131 ++++++++++++++++++---
.../windows-event-log/ConsumeWindowsEventLog.h | 14 ++-
2 files changed, 124 insertions(+), 21 deletions(-)
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index afc6d87..fc07e0f 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -29,6 +29,7 @@
#include <iostream>
#include <memory>
#include <codecvt>
+#include <regex>
#include "io/DataStream.h"
#include "core/ProcessContext.h"
@@ -42,6 +43,10 @@ namespace nifi {
namespace minifi {
namespace processors {
+static std::string to_string(const wchar_t* pChar) {
+ return std::wstring_convert<std::codecvt_utf8<wchar_t>>().to_bytes(pChar);
+}
+
const std::string ConsumeWindowsEventLog::ProcessorName("ConsumeWindowsEventLog");
core::Property ConsumeWindowsEventLog::Channel(
@@ -60,6 +65,14 @@ core::Property ConsumeWindowsEventLog::Query(
supportsExpressionLanguage(true)->
build());
+core::Property ConsumeWindowsEventLog::RenderFormatXML(
+ core::PropertyBuilder::createProperty("Render Format XML?")->
+ isRequired(true)->
+ withDefaultValue<bool>(true)->
+ withDescription("Render format XML or Text.)")->
+ supportsExpressionLanguage(true)->
+ build());
+
core::Property ConsumeWindowsEventLog::MaxBufferSize(
core::PropertyBuilder::createProperty("Max Buffer Size")->
isRequired(true)->
@@ -84,6 +97,9 @@ core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for
ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
: core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()) {
+ // Initializes COM for current thread, it is needed to MSXML parser.
+ CoInitializeEx(0, COINIT_APARTMENTTHREADED);
+
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
if (GetComputerName(buff, &size)) {
@@ -93,9 +109,16 @@ ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::I
}
}
+ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
+ if (xmlDoc_) {
+ xmlDoc_.Release();
+ }
+ CoUninitialize();
+}
+
void ConsumeWindowsEventLog::initialize() {
//! Set the supported properties
- setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect});
+ setSupportedProperties({Channel, Query, RenderFormatXML, MaxBufferSize, InactiveDurationToReconnect});
//! Set the supported relationships
setSupportedRelationships({Success});
@@ -121,7 +144,7 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
const auto flowFileCount = processQueue(session);
- const auto now = GetTickCount();
+ const auto now = GetTickCount64();
if (flowFileCount > 0) {
lastActivityTimestamp_ = now;
@@ -134,14 +157,80 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
}
}
-bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context)
-{
+void ConsumeWindowsEventLog::createTextOutput(const MSXML2::IXMLDOMElementPtr pRoot, std::wstringstream& stream, std::vector<std::wstring>& ancestors) {
+ const auto pNodeChildren = pRoot->childNodes;
+
+ auto writeAncestors = [](const std::vector<std::wstring>& ancestors, std::wstringstream& stream) {
+ for (size_t j = 0; j < ancestors.size() - 1; j++) {
+ stream << ancestors[j] + L'/';
+ }
+ stream << ancestors.back();
+ };
+
+ if (0 == pNodeChildren->length) {
+ writeAncestors(ancestors, stream);
+
+ stream << std::endl;
+ } else {
+ for (long i = 0; i < pNodeChildren->length; i++) {
+ std::wstringstream curStream;
+
+ const auto pNode = pNodeChildren->item[i];
+
+ const auto nodeType = pNode->GetnodeType();
+
+ if (DOMNodeType::NODE_TEXT == nodeType) {
+ const auto nodeValue = pNode->text;
+ if (nodeValue.length()) {
+ writeAncestors(ancestors, stream);
+
+ std::wstring strNodeValue = static_cast<LPCWSTR>(nodeValue);
+
+ // Remove '\n', '\r' - just substitute all whitespaces with ' '.
+ strNodeValue = std::regex_replace(strNodeValue, std::wregex(L"\\s+"), L" ");
+
+ curStream << L" = " << strNodeValue;
+ }
+
+ stream << curStream.str() << std::endl;
+ } else if (DOMNodeType::NODE_ELEMENT == nodeType) {
+ curStream << pNode->nodeName;
+
+ const auto pAttributes = pNode->attributes;
+ for (long iAttr = 0; iAttr < pAttributes->length; iAttr++) {
+ const auto pAttribute = pAttributes->item[iAttr];
+
+ curStream << L" " << pAttribute->nodeName << L'(' << static_cast<_bstr_t>(pAttribute->nodeValue) << L')';
+ }
+
+ ancestors.emplace_back(curStream.str());
+ createTextOutput(pNode, stream, ancestors);
+ ancestors.pop_back();
+ }
+ }
+ }
+}
+
+bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
std::string channel;
context->getProperty(Channel.getName(), channel);
std::string query;
context->getProperty(Query.getName(), query);
+ context->getProperty(RenderFormatXML.getName(), renderXML_);
+ if (!renderXML_) {
+ HRESULT hr = xmlDoc_.CreateInstance(__uuidof(MSXML2::DOMDocument60));
+ if (FAILED(hr)) {
+ logger_->log_error("!xmlDoc_.CreateInstance %x", hr);
+ return false;
+ }
+
+ xmlDoc_->async = VARIANT_FALSE;
+ xmlDoc_->validateOnParse = VARIANT_FALSE;
+ xmlDoc_->resolveExternals = VARIANT_FALSE;
+ }
+
context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
@@ -189,11 +278,24 @@ bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContex
}
size = used;
- std::vector<char> buf(size);
+ std::vector<wchar_t> buf(size/2 + 1);
if (EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
- std::string xml = std::wstring_convert<std::codecvt_utf8<wchar_t>>().to_bytes(reinterpret_cast<wchar_t*>(&buf[0]));
-
- pConsumeWindowsEventLog->renderedXMLs_.enqueue(std::move(xml));
+ const auto xml = to_string(&buf[0]);
+
+ if (pConsumeWindowsEventLog->renderXML_) {
+ pConsumeWindowsEventLog->listRenderedData_.enqueue(std::move(xml));
+ } else {
+ if (VARIANT_FALSE == pConsumeWindowsEventLog->xmlDoc_->loadXML(_bstr_t(xml.c_str()))) {
+ logger->log_error("'loadXML' failed");
+ return 0UL;
+ }
+
+ std::wstringstream stream;
+ std::vector<std::wstring> ancestors;
+ pConsumeWindowsEventLog->createTextOutput(pConsumeWindowsEventLog->xmlDoc_->documentElement, stream, ancestors);
+
+ pConsumeWindowsEventLog->listRenderedData_.enqueue(to_string(stream.str().c_str()));
+ }
} else {
logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
}
@@ -210,7 +312,7 @@ bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContex
return false;
}
- lastActivityTimestamp_ = GetTickCount();
+ lastActivityTimestamp_ = GetTickCount64();
return true;
}
@@ -228,24 +330,19 @@ int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSess
struct WriteCallback: public OutputStreamCallback {
WriteCallback(const std::string& str)
: str_(str) {
- status_ = 0;
}
int64_t process(std::shared_ptr<io::BaseStream> stream) {
- auto len = stream->writeData((uint8_t*)&str_[0], str_.size());
- if (len < 0)
- status_ = -1;
- return len;
+ return stream->writeData((uint8_t*)&str_[0], str_.size());
}
std::string str_;
- int status_;
};
int flowFileCount = 0;
std::string xml;
- while (renderedXMLs_.try_dequeue(xml)) {
+ while (listRenderedData_.try_dequeue(xml)) {
auto flowFile = session->create();
session->write(flowFile, &WriteCallback(xml));
@@ -264,7 +361,7 @@ void ConsumeWindowsEventLog::notifyStop()
{
unsubscribe();
- if (renderedXMLs_.size_approx() != 0) {
+ if (listRenderedData_.size_approx() != 0) {
auto session = sessionFactory_->createSession();
if (session) {
logger_->log_info("Finishing processing leftover events");
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 4a00245..e7372be 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -26,6 +26,9 @@
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include <winevt.h>
+#include <sstream>
+
+#import <msxml6.dll>
namespace org {
namespace apache {
@@ -44,9 +47,7 @@ public:
ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid = utils::Identifier());
//! Destructor
- virtual ~ConsumeWindowsEventLog()
- {
- }
+ virtual ~ConsumeWindowsEventLog();
//! Processor Name
static const std::string ProcessorName;
@@ -54,6 +55,7 @@ public:
//! Supported Properties
static core::Property Channel;
static core::Property Query;
+ static core::Property RenderFormatXML;
static core::Property MaxBufferSize;
static core::Property InactiveDurationToReconnect;
@@ -79,11 +81,13 @@ protected:
void unsubscribe();
int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+ void createTextOutput(const MSXML2::IXMLDOMElementPtr pRoot, std::wstringstream& stream, std::vector<std::wstring>& ancestors);
+
void LogWindowsError();
private:
// Logger
std::shared_ptr<logging::Logger> logger_;
- moodycamel::ConcurrentQueue<std::string> renderedXMLs_;
+ moodycamel::ConcurrentQueue<std::string> listRenderedData_;
std::string provenanceUri_;
std::string computerName_;
int64_t inactiveDurationToReconnect_{};
@@ -91,6 +95,8 @@ private:
uint64_t maxBufferSize_{};
DWORD lastActivityTimestamp_{};
std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
+ bool renderXML_{};
+ MSXML2::IXMLDOMDocumentPtr xmlDoc_;
};
REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.");