You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/11/06 15:11:53 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1043 - FetchOPC
should support reporting changes only
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new f9603ce MINIFICPP-1043 - FetchOPC should support reporting changes only
f9603ce is described below
commit f9603ce5289c129d676337592cd822042c438995
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Tue Oct 29 01:08:10 2019 +0100
MINIFICPP-1043 - FetchOPC should support reporting changes only
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #672
---
extensions/opc/include/fetchopc.h | 5 ++++-
extensions/opc/src/fetchopc.cpp | 37 +++++++++++++++++++++++++++++++------
extensions/opc/src/opc.cpp | 23 +++++++++++++++++++++--
3 files changed, 56 insertions(+), 9 deletions(-)
diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index dc6fbc3..b4377f8 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -22,7 +22,7 @@
#include <memory>
#include <string>
#include <list>
-#include <map>
+#include <unordered_map>
#include <mutex>
#include <thread>
@@ -53,6 +53,7 @@ public:
static core::Property NodeID;
static core::Property NameSpaceIndex;
static core::Property MaxDepth;
+ static core::Property Lazy;
// Supported Relationships
static core::Relationship Success;
@@ -91,10 +92,12 @@ protected:
uint32_t nodesFound_;
uint32_t variablesFound_;
uint64_t maxDepth_;
+ bool lazy_mode_;
private:
std::mutex onTriggerMutex_;
std::vector<UA_NodeId> translatedNodeIDs_; // Only used when user provides path, path->nodeid translation is only done once
+ std::unordered_map<std::string, std::string> node_timestamp_; // Key = Full path, Value = Timestamp
};
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index 4efc202..ce29fd7 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -65,13 +65,21 @@ namespace processors {
->withDescription("Specifiec the max depth of browsing. 0 means unlimited.")
->withDefaultValue<uint64_t>(0)->build());
+ core::Property FetchOPCProcessor::Lazy(
+ core::PropertyBuilder::createProperty("Lazy mode")
+ ->withDescription("Only creates flowfiles from nodes with new timestamp from the server.")
+ ->withDefaultValue<std::string>("Off")
+ ->isRequired(true)
+ ->withAllowableValues<std::string>({"On", "Off"})
+ ->build());
+
core::Relationship FetchOPCProcessor::Success("success", "Successfully retrieved OPC-UA nodes");
core::Relationship FetchOPCProcessor::Failure("failure", "Retrieved OPC-UA nodes where value cannot be extracted (only if enabled)");
void FetchOPCProcessor::initialize() {
// Set the supported properties
- std::set<core::Property> fetchOPCProperties = {OPCServerEndPoint, NodeID, NodeIDType, NameSpaceIndex, MaxDepth};
+ std::set<core::Property> fetchOPCProperties = {OPCServerEndPoint, NodeID, NodeIDType, NameSpaceIndex, MaxDepth, Lazy};
std::set<core::Property> baseOPCProperties = BaseOPCProcessor::getSupportedProperties();
fetchOPCProperties.insert(baseOPCProperties.begin(), baseOPCProperties.end());
setSupportedProperties(fetchOPCProperties);
@@ -126,6 +134,9 @@ namespace processors {
}
}
+ context->getProperty(Lazy.getName(), value);
+ lazy_mode_ = value == "On" ? true : false;
+
configOK_ = true;
}
@@ -178,7 +189,7 @@ namespace processors {
}
}
if(nodesFound_ == 0) {
- logger_->log_warn("Connected to OPC server, but no variable nodes were not found. Configuration might be incorrect! Yielding...");
+ logger_->log_warn("Connected to OPC server, but no variable nodes were found. Configuration might be incorrect! Yielding...");
yield();
} else if (variablesFound_ == 0) {
logger_->log_warn("Found no variables when traversing the specified node. No flowfiles are generated. Yielding...");
@@ -193,12 +204,26 @@ namespace processors {
if(ref->nodeClass == UA_NODECLASS_VARIABLE)
{
try {
- opc::NodeData nodedata = connection_->getNodeData(ref);
- OPCData2FlowFile(nodedata, context, session);
- variablesFound_++;
+ opc::NodeData nodedata = connection_->getNodeData(ref, path);
+ bool write = true;
+ if (lazy_mode_) {
+ write = false;
+ std::string nodeid = nodedata.attributes["Full path"];
+ std::string cur_timestamp = node_timestamp_[nodeid];
+ std::string new_timestamp = nodedata.attributes["Sourcetimestamp"];
+ if (cur_timestamp != new_timestamp) {
+ node_timestamp_[nodeid] = new_timestamp;
+ logger_->log_debug("Node %s has new source timestamp %s", nodeid, new_timestamp);
+ write = true;
+ }
+ }
+ if (write) {
+ OPCData2FlowFile(nodedata, context, session);
+ variablesFound_++;
+ }
} catch (const std::exception& exception) {
std::string browsename((char*)ref->browseName.name.data, ref->browseName.name.length);
- logger_->log_warn("Caught Exception while trying to get data from node &s: %s", path + "/" + browsename, exception.what());
+ logger_->log_warn("Caught Exception while trying to get data from node %s: %s", path + "/" + browsename, exception.what());
}
}
return true;
diff --git a/extensions/opc/src/opc.cpp b/extensions/opc/src/opc.cpp
index 841fd55..2baa8aa 100644
--- a/extensions/opc/src/opc.cpp
+++ b/extensions/opc/src/opc.cpp
@@ -224,6 +224,25 @@ NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::stri
nodedata.dataTypeID = UA_TYPES_COUNT;
UA_Variant* var = UA_Variant_new();
if(UA_Client_readValueAttribute(client_, ref->nodeId.nodeId, var) == UA_STATUSCODE_GOOD && var->type != NULL && var->data != NULL) {
+ // Because the timestamps are eliminated in readValueAttribute for simplification
+ // We need to call the inner function UA_Client_Service_read.
+ UA_ReadValueId item;
+ UA_ReadValueId_init(&item);
+ item.nodeId = ref->nodeId.nodeId;
+ item.attributeId = UA_ATTRIBUTEID_VALUE;
+ UA_ReadRequest request;
+ UA_ReadRequest_init(&request);
+ request.nodesToRead = &item;
+ request.nodesToReadSize = 1;
+ // Differ from ua_client_highlevel.c src
+ request.timestampsToReturn = UA_TIMESTAMPSTORETURN_BOTH;
+ UA_ReadResponse response = UA_Client_Service_read(client_, request);
+ UA_DataValue *dv = response.results;
+ auto server_timestamp = OPCDateTime2String(dv->serverTimestamp);
+ auto source_timestamp = OPCDateTime2String(dv->sourceTimestamp);
+ nodedata.attributes["Sourcetimestamp"] = source_timestamp;
+ UA_ReadResponse_deleteMembers(&response);
+
nodedata.dataTypeID = var->type->typeIndex;
nodedata.addVariant(var);
if(var->type->typeName) {
@@ -548,9 +567,9 @@ std::string OPCDateTime2String(UA_DateTime raw_date) {
UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
std::array<char, 100> charBuf;
- snprintf(charBuf.data(), charBuf.size(), "%02hu-%02hu-%04hu %02hu:%02hu:%02hu.%03hu", dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
+ int sz = snprintf(charBuf.data(), charBuf.size(), "%02hu-%02hu-%04hu %02hu:%02hu:%02hu.%03hu", dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
- return std::string(charBuf.data(), charBuf.size());
+ return std::string(charBuf.data(), sz);
}
void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const char *msg, va_list args) {