You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/11/06 15:11:53 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1043 - FetchOPC should support reporting changes only

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new f9603ce  MINIFICPP-1043 - FetchOPC should support reporting changes only
f9603ce is described below

commit f9603ce5289c129d676337592cd822042c438995
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Tue Oct 29 01:08:10 2019 +0100

    MINIFICPP-1043 - FetchOPC should support reporting changes only
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #672
---
 extensions/opc/include/fetchopc.h |  5 ++++-
 extensions/opc/src/fetchopc.cpp   | 37 +++++++++++++++++++++++++++++++------
 extensions/opc/src/opc.cpp        | 23 +++++++++++++++++++++--
 3 files changed, 56 insertions(+), 9 deletions(-)

diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index dc6fbc3..b4377f8 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -22,7 +22,7 @@
 #include <memory>
 #include <string>
 #include <list>
-#include <map>
+#include <unordered_map>
 #include <mutex>
 #include <thread>
 
@@ -53,6 +53,7 @@ public:
   static core::Property NodeID;
   static core::Property NameSpaceIndex;
   static core::Property MaxDepth;
+  static core::Property Lazy;
 
   // Supported Relationships
   static core::Relationship Success;
@@ -91,10 +92,12 @@ protected:
   uint32_t nodesFound_;
   uint32_t variablesFound_;
   uint64_t maxDepth_;
+  bool lazy_mode_;
 
 private:
   std::mutex onTriggerMutex_;
   std::vector<UA_NodeId> translatedNodeIDs_;  // Only used when user provides path, path->nodeid translation is only done once
+  std::unordered_map<std::string, std::string> node_timestamp_; // Key = Full path, Value = Timestamp
 
 };
 
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index 4efc202..ce29fd7 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -65,13 +65,21 @@ namespace processors {
       ->withDescription("Specifiec the max depth of browsing. 0 means unlimited.")
       ->withDefaultValue<uint64_t>(0)->build());
 
+  core::Property FetchOPCProcessor::Lazy(
+      core::PropertyBuilder::createProperty("Lazy mode")
+      ->withDescription("Only creates flowfiles from nodes with new timestamp from the server.")
+      ->withDefaultValue<std::string>("Off")
+      ->isRequired(true)
+      ->withAllowableValues<std::string>({"On", "Off"})
+      ->build());
+
   core::Relationship FetchOPCProcessor::Success("success", "Successfully retrieved OPC-UA nodes");
   core::Relationship FetchOPCProcessor::Failure("failure", "Retrieved OPC-UA nodes where value cannot be extracted (only if enabled)");
 
 
   void FetchOPCProcessor::initialize() {
     // Set the supported properties
-    std::set<core::Property> fetchOPCProperties = {OPCServerEndPoint, NodeID, NodeIDType, NameSpaceIndex, MaxDepth};
+    std::set<core::Property> fetchOPCProperties = {OPCServerEndPoint, NodeID, NodeIDType, NameSpaceIndex, MaxDepth, Lazy};
     std::set<core::Property> baseOPCProperties = BaseOPCProcessor::getSupportedProperties();
     fetchOPCProperties.insert(baseOPCProperties.begin(), baseOPCProperties.end());
     setSupportedProperties(fetchOPCProperties);
@@ -126,6 +134,9 @@ namespace processors {
       }
     }
 
+    context->getProperty(Lazy.getName(), value);
+    lazy_mode_ = value == "On" ? true : false;
+
     configOK_ = true;
   }
 
@@ -178,7 +189,7 @@ namespace processors {
       }
     }
     if(nodesFound_ == 0) {
-      logger_->log_warn("Connected to OPC server, but no variable nodes were not found. Configuration might be incorrect! Yielding...");
+      logger_->log_warn("Connected to OPC server, but no variable nodes were found. Configuration might be incorrect! Yielding...");
       yield();
     } else if (variablesFound_ == 0) {
       logger_->log_warn("Found no variables when traversing the specified node. No flowfiles are generated. Yielding...");
@@ -193,12 +204,26 @@ namespace processors {
     if(ref->nodeClass == UA_NODECLASS_VARIABLE)
     {
       try {
-        opc::NodeData nodedata = connection_->getNodeData(ref);
-        OPCData2FlowFile(nodedata, context, session);
-        variablesFound_++;
+        opc::NodeData nodedata = connection_->getNodeData(ref, path);
+        bool write = true;
+        if (lazy_mode_) {
+          write = false;
+          std::string nodeid = nodedata.attributes["Full path"];
+          std::string cur_timestamp = node_timestamp_[nodeid];
+          std::string new_timestamp = nodedata.attributes["Sourcetimestamp"];
+          if (cur_timestamp != new_timestamp) {
+            node_timestamp_[nodeid] = new_timestamp;
+            logger_->log_debug("Node %s has new source timestamp %s", nodeid, new_timestamp);
+            write = true;
+          }
+        }
+        if (write) {
+          OPCData2FlowFile(nodedata, context, session);
+          variablesFound_++;
+        }
       } catch (const std::exception& exception) {
         std::string browsename((char*)ref->browseName.name.data, ref->browseName.name.length);
-        logger_->log_warn("Caught Exception while trying to get data from node &s: %s", path + "/" + browsename,  exception.what());
+        logger_->log_warn("Caught Exception while trying to get data from node %s: %s", path + "/" + browsename,  exception.what());
       }
     }
     return true;
diff --git a/extensions/opc/src/opc.cpp b/extensions/opc/src/opc.cpp
index 841fd55..2baa8aa 100644
--- a/extensions/opc/src/opc.cpp
+++ b/extensions/opc/src/opc.cpp
@@ -224,6 +224,25 @@ NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::stri
     nodedata.dataTypeID = UA_TYPES_COUNT;
     UA_Variant* var = UA_Variant_new();
     if(UA_Client_readValueAttribute(client_, ref->nodeId.nodeId, var) == UA_STATUSCODE_GOOD && var->type != NULL && var->data != NULL) {
+      // Because the timestamps are eliminated in readValueAttribute for simplification
+      // We need to call the inner function UA_Client_Service_read.
+      UA_ReadValueId item;
+      UA_ReadValueId_init(&item);
+      item.nodeId = ref->nodeId.nodeId;
+      item.attributeId = UA_ATTRIBUTEID_VALUE;
+      UA_ReadRequest request;
+      UA_ReadRequest_init(&request);
+      request.nodesToRead = &item;
+      request.nodesToReadSize = 1;
+      // Differ from ua_client_highlevel.c src
+      request.timestampsToReturn = UA_TIMESTAMPSTORETURN_BOTH;
+      UA_ReadResponse response = UA_Client_Service_read(client_, request);
+      UA_DataValue *dv = response.results;
+      auto server_timestamp = OPCDateTime2String(dv->serverTimestamp);
+      auto source_timestamp = OPCDateTime2String(dv->sourceTimestamp);
+      nodedata.attributes["Sourcetimestamp"] = source_timestamp;
+      UA_ReadResponse_deleteMembers(&response);
+
       nodedata.dataTypeID = var->type->typeIndex;
       nodedata.addVariant(var);
       if(var->type->typeName) {
@@ -548,9 +567,9 @@ std::string OPCDateTime2String(UA_DateTime raw_date) {
   UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
   std::array<char, 100> charBuf;
 
-  snprintf(charBuf.data(), charBuf.size(), "%02hu-%02hu-%04hu %02hu:%02hu:%02hu.%03hu", dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
+  int sz = snprintf(charBuf.data(), charBuf.size(), "%02hu-%02hu-%04hu %02hu:%02hu:%02hu.%03hu", dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
 
-  return std::string(charBuf.data(), charBuf.size());
+  return std::string(charBuf.data(), sz);
 }
 
 void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const char *msg, va_list args) {