You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2019/06/07 18:17:55 UTC

[nifi-minifi-cpp] branch master updated: minificpp-773 Implemented Text rendering.

This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 47b0413  minificpp-773 Implemented Text rendering.
47b0413 is described below

commit 47b041396d127ba08e8651a64f39b9c33fa3d593
Author: amarmer <am...@AMARMER-5530-85>
AuthorDate: Tue Jun 4 14:07:00 2019 -0700

    minificpp-773 Implemented Text rendering.
    
    This closes #584.
    
    Signed-off-by: Marc Parisi <ph...@apache.org>
---
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 131 ++++++++++++++++++---
 .../windows-event-log/ConsumeWindowsEventLog.h     |  14 ++-
 2 files changed, 124 insertions(+), 21 deletions(-)

diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index afc6d87..fc07e0f 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -29,6 +29,7 @@
 #include <iostream>
 #include <memory>
 #include <codecvt>
+#include <regex>
 
 #include "io/DataStream.h"
 #include "core/ProcessContext.h"
@@ -42,6 +43,10 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+static std::string to_string(const wchar_t* pChar) {
+  return std::wstring_convert<std::codecvt_utf8<wchar_t>>().to_bytes(pChar);
+}
+
 const std::string ConsumeWindowsEventLog::ProcessorName("ConsumeWindowsEventLog");
 
 core::Property ConsumeWindowsEventLog::Channel(
@@ -60,6 +65,14 @@ core::Property ConsumeWindowsEventLog::Query(
   supportsExpressionLanguage(true)->
   build());
 
+core::Property ConsumeWindowsEventLog::RenderFormatXML(
+  core::PropertyBuilder::createProperty("Render Format XML?")->
+  isRequired(true)->
+  withDefaultValue<bool>(true)->
+  withDescription("Render format XML or Text.)")->
+  supportsExpressionLanguage(true)->
+  build());
+
 core::Property ConsumeWindowsEventLog::MaxBufferSize(
   core::PropertyBuilder::createProperty("Max Buffer Size")->
   isRequired(true)->
@@ -84,6 +97,9 @@ core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for
 
 ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
   : core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()) {
+  // Initializes COM for current thread, it is needed to MSXML parser.
+  CoInitializeEx(0, COINIT_APARTMENTTHREADED);
+
   char buff[MAX_COMPUTERNAME_LENGTH + 1];
   DWORD size = sizeof(buff);
   if (GetComputerName(buff, &size)) {
@@ -93,9 +109,16 @@ ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::I
   }
 }
 
+ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
+  if (xmlDoc_) {
+    xmlDoc_.Release();
+  }
+  CoUninitialize();
+}
+
 void ConsumeWindowsEventLog::initialize() {
   //! Set the supported properties
-  setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect});
+  setSupportedProperties({Channel, Query, RenderFormatXML, MaxBufferSize, InactiveDurationToReconnect});
 
   //! Set the supported relationships
   setSupportedRelationships({Success});
@@ -121,7 +144,7 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
 
   const auto flowFileCount = processQueue(session);
 
-  const auto now = GetTickCount();
+  const auto now = GetTickCount64();
 
   if (flowFileCount > 0) {
     lastActivityTimestamp_ = now;
@@ -134,14 +157,80 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
   }
 }
 
-bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context)
-{
+void ConsumeWindowsEventLog::createTextOutput(const MSXML2::IXMLDOMElementPtr pRoot, std::wstringstream& stream, std::vector<std::wstring>& ancestors) {
+  const auto pNodeChildren = pRoot->childNodes;
+
+  auto writeAncestors = [](const std::vector<std::wstring>& ancestors, std::wstringstream& stream) {
+    for (size_t j = 0; j < ancestors.size() - 1; j++) {
+      stream << ancestors[j] + L'/';
+    }
+    stream << ancestors.back();
+  };
+
+  if (0 == pNodeChildren->length) {
+    writeAncestors(ancestors, stream);
+
+    stream << std::endl;
+  } else {
+    for (long i = 0; i < pNodeChildren->length; i++) {
+      std::wstringstream curStream;
+
+      const auto pNode = pNodeChildren->item[i];
+
+      const auto nodeType = pNode->GetnodeType();
+
+      if (DOMNodeType::NODE_TEXT == nodeType) {
+        const auto nodeValue = pNode->text;
+        if (nodeValue.length()) {
+          writeAncestors(ancestors, stream);
+
+          std::wstring strNodeValue = static_cast<LPCWSTR>(nodeValue);
+
+          // Remove '\n', '\r' - just substitute all whitespaces with ' '. 
+          strNodeValue = std::regex_replace(strNodeValue, std::wregex(L"\\s+"), L" ");
+
+          curStream << L" = " << strNodeValue;
+        }
+
+        stream << curStream.str() << std::endl;
+      } else if (DOMNodeType::NODE_ELEMENT == nodeType) {
+        curStream << pNode->nodeName;
+
+        const auto pAttributes = pNode->attributes;
+        for (long iAttr = 0; iAttr < pAttributes->length; iAttr++) {
+          const auto pAttribute = pAttributes->item[iAttr];
+
+          curStream << L" " << pAttribute->nodeName << L'(' << static_cast<_bstr_t>(pAttribute->nodeValue) << L')';
+        }
+
+        ancestors.emplace_back(curStream.str());
+        createTextOutput(pNode, stream, ancestors);
+        ancestors.pop_back();
+      }
+    }
+  }
+}
+
+bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
   std::string channel;
   context->getProperty(Channel.getName(), channel);
 
   std::string query;
   context->getProperty(Query.getName(), query);
 
+  context->getProperty(RenderFormatXML.getName(), renderXML_);
+  if (!renderXML_) {
+    HRESULT hr = xmlDoc_.CreateInstance(__uuidof(MSXML2::DOMDocument60));
+    if (FAILED(hr)) {
+      logger_->log_error("!xmlDoc_.CreateInstance %x", hr);
+      return false;
+    }
+
+    xmlDoc_->async = VARIANT_FALSE;
+    xmlDoc_->validateOnParse = VARIANT_FALSE;
+    xmlDoc_->resolveExternals = VARIANT_FALSE;
+  }
+
   context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
   logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
 
@@ -189,11 +278,24 @@ bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContex
               }
 
               size = used;
-              std::vector<char> buf(size);
+              std::vector<wchar_t> buf(size/2 + 1);
               if (EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-                std::string xml = std::wstring_convert<std::codecvt_utf8<wchar_t>>().to_bytes(reinterpret_cast<wchar_t*>(&buf[0]));
-
-                pConsumeWindowsEventLog->renderedXMLs_.enqueue(std::move(xml));
+                const auto xml = to_string(&buf[0]);
+
+                if (pConsumeWindowsEventLog->renderXML_) {
+                  pConsumeWindowsEventLog->listRenderedData_.enqueue(std::move(xml));
+                } else {
+                  if (VARIANT_FALSE == pConsumeWindowsEventLog->xmlDoc_->loadXML(_bstr_t(xml.c_str()))) {
+                    logger->log_error("'loadXML' failed");
+                    return 0UL;
+                  }
+
+                  std::wstringstream stream;
+                  std::vector<std::wstring> ancestors;
+                  pConsumeWindowsEventLog->createTextOutput(pConsumeWindowsEventLog->xmlDoc_->documentElement, stream, ancestors);
+
+                  pConsumeWindowsEventLog->listRenderedData_.enqueue(to_string(stream.str().c_str()));
+                }
               } else {
                 logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
               }
@@ -210,7 +312,7 @@ bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContex
     return false;
   }
 
-  lastActivityTimestamp_ = GetTickCount();
+  lastActivityTimestamp_ = GetTickCount64();
 
   return true;
 }
@@ -228,24 +330,19 @@ int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSess
   struct WriteCallback: public OutputStreamCallback {
     WriteCallback(const std::string& str)
       : str_(str) {
-      status_ = 0;
     }
 
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      auto len = stream->writeData((uint8_t*)&str_[0], str_.size());
-      if (len < 0)
-        status_ = -1;
-      return len;
+      return stream->writeData((uint8_t*)&str_[0], str_.size());
     }
 
     std::string str_;
-    int status_;
   };
 
   int flowFileCount = 0;
 
   std::string xml;
-  while (renderedXMLs_.try_dequeue(xml)) {
+  while (listRenderedData_.try_dequeue(xml)) {
     auto flowFile = session->create();
 
     session->write(flowFile, &WriteCallback(xml));
@@ -264,7 +361,7 @@ void ConsumeWindowsEventLog::notifyStop()
 {
   unsubscribe();
 
-  if (renderedXMLs_.size_approx() != 0) {
+  if (listRenderedData_.size_approx() != 0) {
     auto session = sessionFactory_->createSession();
     if (session) {
       logger_->log_info("Finishing processing leftover events");
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 4a00245..e7372be 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -26,6 +26,9 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include <winevt.h>
+#include <sstream>
+
+#import <msxml6.dll>
 
 namespace org {
 namespace apache {
@@ -44,9 +47,7 @@ public:
   ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid = utils::Identifier());
 
   //! Destructor
-  virtual ~ConsumeWindowsEventLog()
-  {
-  }
+  virtual ~ConsumeWindowsEventLog();
 
   //! Processor Name
   static const std::string ProcessorName;
@@ -54,6 +55,7 @@ public:
   //! Supported Properties
   static core::Property Channel;
   static core::Property Query;
+  static core::Property RenderFormatXML;
   static core::Property MaxBufferSize;
   static core::Property InactiveDurationToReconnect;
 
@@ -79,11 +81,13 @@ protected:
   void unsubscribe();
   int processQueue(const std::shared_ptr<core::ProcessSession> &session);
 
+  void createTextOutput(const MSXML2::IXMLDOMElementPtr pRoot, std::wstringstream& stream, std::vector<std::wstring>& ancestors);
+
   void LogWindowsError();
 private:
   // Logger
   std::shared_ptr<logging::Logger> logger_;
-  moodycamel::ConcurrentQueue<std::string> renderedXMLs_;
+  moodycamel::ConcurrentQueue<std::string> listRenderedData_;
   std::string provenanceUri_;
   std::string computerName_;
   int64_t inactiveDurationToReconnect_{};
@@ -91,6 +95,8 @@ private:
   uint64_t maxBufferSize_{};
   DWORD lastActivityTimestamp_{};
   std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
+  bool renderXML_{};
+  MSXML2::IXMLDOMDocumentPtr xmlDoc_;
 };
 
 REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.");