You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/09/20 15:15:01 UTC
[6/9] nifi-minifi-cpp git commit: MINIFICPP-595: Provide basic
support for windows. MINIFICPP-32: Add windows event log reader
MINIFICPP-596: Build core and libminifi artifacts. Must abstract features
that are operating system dependent, such as uuid
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h
index f9dc678..69223c4 100644
--- a/libminifi/include/processors/ListenSyslog.h
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -21,14 +21,17 @@
#define __LISTEN_SYSLOG_H__
#include <stdio.h>
-#include <unistd.h>
#include <sys/types.h>
+#ifndef WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
-#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
+#else
+#include <WinSock2.h>
+#endif
+#include <errno.h>
#include <sys/types.h>
#include <chrono>
#include <thread>
@@ -39,12 +42,15 @@
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
+#ifndef WIN32
+
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
+
// SyslogEvent
typedef struct {
char *payload;
@@ -58,7 +64,7 @@ class ListenSyslog : public core::Processor {
/*!
* Create a new processor
*/
- ListenSyslog(std::string name, uuid_t uuid = NULL)
+ ListenSyslog(std::string name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid),
logger_(logging::LoggerFactory<ListenSyslog>::getLogger()) {
_eventQueueByteSize = 0;
@@ -216,3 +222,5 @@ REGISTER_RESOURCE(ListenSyslog);
} /* namespace org */
#endif
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h
index b9e333f..412e068 100644
--- a/libminifi/include/processors/LogAttribute.h
+++ b/libminifi/include/processors/LogAttribute.h
@@ -40,7 +40,7 @@ class LogAttribute : public core::Processor {
/*!
* Create a new processor
*/
- LogAttribute(std::string name, uuid_t uuid = NULL)
+ LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid),
logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h
index ddcdac1..ba410d5 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -46,7 +46,7 @@ class PutFile : public core::Processor {
/*!
* Create a new processor
*/
- PutFile(std::string name, uuid_t uuid = NULL)
+ PutFile(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(name, uuid),
logger_(logging::LoggerFactory<PutFile>::getLogger()) {
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/RouteOnAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/RouteOnAttribute.h b/libminifi/include/processors/RouteOnAttribute.h
index 1c1eb45..0737f8f 100644
--- a/libminifi/include/processors/RouteOnAttribute.h
+++ b/libminifi/include/processors/RouteOnAttribute.h
@@ -36,7 +36,7 @@ namespace processors {
class RouteOnAttribute : public core::Processor {
public:
- RouteOnAttribute(std::string name, uuid_t uuid = NULL)
+ RouteOnAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(name, uuid),
logger_(logging::LoggerFactory<RouteOnAttribute>::getLogger()) {
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h
index fd59cb5..a98e990 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -40,7 +40,7 @@ class TailFile : public core::Processor {
/*!
* Create a new processor
*/
- explicit TailFile(std::string name, uuid_t uuid = NULL)
+ explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(name, uuid),
logger_(logging::LoggerFactory<TailFile>::getLogger()) {
_stateRecovered = false;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/UpdateAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/UpdateAttribute.h b/libminifi/include/processors/UpdateAttribute.h
index 117c78a..a768a87 100644
--- a/libminifi/include/processors/UpdateAttribute.h
+++ b/libminifi/include/processors/UpdateAttribute.h
@@ -36,7 +36,7 @@ namespace processors {
class UpdateAttribute : public core::Processor {
public:
- UpdateAttribute(std::string name, uuid_t uuid = NULL)
+ UpdateAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(name, uuid),
logger_(logging::LoggerFactory<UpdateAttribute>::getLogger()) {
}
@@ -64,7 +64,7 @@ class UpdateAttribute : public core::Processor {
private:
std::shared_ptr<logging::Logger> logger_;
- std::vector<std::string> attributes_;
+ std::vector<core::Property> attributes_;
};
REGISTER_RESOURCE(UpdateAttribute);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 72fd379..5ec33fa 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -18,7 +18,6 @@
#ifndef __PROVENANCE_H__
#define __PROVENANCE_H__
-#include <ftw.h>
#include <uuid/uuid.h>
#include <atomic>
#include <cstdint>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/Peer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index 3775c7c..2c9e2a0 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -19,9 +19,6 @@
#define LIBMINIFI_INCLUDE_SITETOSITE_PEER_H_
#include <stdio.h>
-#include <fcntl.h>
-#include <resolv.h>
-#include <netdb.h>
#include <string>
#include <errno.h>
#include <uuid/uuid.h>
@@ -36,6 +33,7 @@
#include "properties/Configure.h"
#include "io/ClientSocket.h"
#include "io/BaseStream.h"
+#include "io/DataStream.h"
#include "utils/TimeUtil.h"
#include "utils/HTTPClient.h"
@@ -47,11 +45,11 @@ namespace sitetosite {
class Peer {
public:
- explicit Peer(uuid_t port_id, const std::string &host, uint16_t port, bool secure = false)
+ explicit Peer(utils::Identifier &port_id, const std::string &host, uint16_t port, bool secure = false)
: host_(host),
port_(port),
secure_(secure) {
- uuid_copy(port_id_, port_id);
+ port_id_ = port_id;
}
explicit Peer(const std::string &host, uint16_t port, bool secure = false)
@@ -64,14 +62,14 @@ class Peer {
: host_(other.host_),
port_(other.port_),
secure_(other.secure_) {
- uuid_copy(port_id_, other.port_id_);
+ port_id_ = other.port_id_;
}
- explicit Peer(const Peer &&other)
+ explicit Peer(Peer &&other)
: host_(std::move(other.host_)),
port_(std::move(other.port_)),
secure_(std::move(other.secure_)) {
- uuid_copy(port_id_, other.port_id_);
+ port_id_ = other.port_id_;
}
uint16_t getPort() const {
@@ -86,8 +84,8 @@ class Peer {
return secure_;
}
- void getPortId(uuid_t other) const {
- uuid_copy(other, port_id_);
+ void getPortId(utils::Identifier &other) const {
+ other = port_id_;
}
protected:
@@ -95,7 +93,7 @@ class Peer {
uint16_t port_;
- uuid_t port_id_;
+ utils::Identifier port_id_;
// secore comms
@@ -149,12 +147,12 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
/*
* Create a new site2site peer
*/
- explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host, uint16_t port, const std::string &interface)
- : SiteToSitePeer(host, port, interface) {
+ explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host, uint16_t port, const std::string &ifc)
+ : SiteToSitePeer(host, port, ifc) {
stream_ = std::move(injected_socket);
}
- explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &interface)
+ explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &ifc)
: stream_(nullptr),
host_(host),
port_(port),
@@ -164,7 +162,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
url_ = "nifi://" + host_ + ":" + std::to_string(port_);
yield_expiration_ = 0;
timeout_ = 30000; // 30 seconds
- local_network_interface_= std::move(io::NetworkInterface(interface, nullptr));
+ local_network_interface_= std::move(io::NetworkInterface(ifc, nullptr));
}
explicit SiteToSitePeer(SiteToSitePeer &&ss)
@@ -191,8 +189,8 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
return url_;
}
// setInterface
- void setInterface(std::string &interface) {
- local_network_interface_ = std::move(io::NetworkInterface(interface,nullptr));
+ void setInterface(std::string &ifc) {
+ local_network_interface_ = std::move(io::NetworkInterface(ifc,nullptr));
}
std::string getInterface() {
return local_network_interface_.getInterface();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/RawSocketProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h b/libminifi/include/sitetosite/RawSocketProtocol.h
index 7a075bf..0c7feb5 100644
--- a/libminifi/include/sitetosite/RawSocketProtocol.h
+++ b/libminifi/include/sitetosite/RawSocketProtocol.h
@@ -21,13 +21,7 @@
#define __SITE2SITE_CLIENT_PROTOCOL_H__
#include <stdio.h>
-#include <unistd.h>
#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
#include <string>
#include <errno.h>
#include <chrono>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSite.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h
index 484a277..2cc0823 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -245,8 +245,7 @@ class Transaction {
// Generate the global UUID for the transaction
id_generator_->generate(uuid_);
- uuid_unparse_lower(uuid_, uuidStr);
- uuid_str_ = uuidStr;
+ uuid_str_ = uuid_.to_string();
}
// Destructor
virtual ~Transaction() {
@@ -262,7 +261,7 @@ class Transaction {
void setUUIDStr(const std::string &str) {
uuid_str_ = str;
- uuid_parse(str.c_str(), uuid_);
+ uuid_ = str;
}
// getState
@@ -323,7 +322,7 @@ class Transaction {
TransferDirection _direction;
// A global unique identifier
- uuid_t uuid_;
+ utils::Identifier uuid_;
// UUID string
std::string uuid_str_;
@@ -332,10 +331,10 @@ class Transaction {
class SiteToSiteClientConfiguration {
public:
- SiteToSiteClientConfiguration(std::shared_ptr<io::StreamFactory> stream_factory, const std::shared_ptr<Peer> &peer, const std::string &interface, CLIENT_TYPE type = RAW)
+ SiteToSiteClientConfiguration(std::shared_ptr<io::StreamFactory> stream_factory, const std::shared_ptr<Peer> &peer, const std::string &ifc, CLIENT_TYPE type = RAW)
: stream_factory_(stream_factory),
peer_(peer),
- local_network_interface_(interface),
+ local_network_interface_(ifc),
ssl_service_(nullptr) {
client_type_ = type;
}
@@ -363,8 +362,8 @@ class SiteToSiteClientConfiguration {
}
// setInterface
- void setInterface(std::string &interface) {
- local_network_interface_ = interface;
+ void setInterface(std::string &ifc) {
+ local_network_interface_ = ifc;
}
std::string getInterface() const {
return local_network_interface_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSiteClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 259b95e..6469e82 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -56,7 +56,7 @@ class SiteToSiteClient : public core::Connectable {
public:
SiteToSiteClient()
- : core::Connectable("SitetoSiteClient", 0),
+ : core::Connectable("SitetoSiteClient"),
peer_state_(IDLE),
_batchSendNanos(5000000000),
ssl_context_service_(nullptr),
@@ -96,11 +96,20 @@ class SiteToSiteClient : public core::Connectable {
* @returns true if the process succeeded, failure OR exception thrown otherwise
*/
virtual bool transfer(TransferDirection direction, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- if (__builtin_expect(direction == SEND, 1)) {
+#ifndef WIN32
+ if (__builtin_expect(direction == SEND, 1)) {
return transferFlowFiles(context, session);
} else {
return receiveFlowFiles(context, session);
}
+#else
+ if (direction == SEND) {
+ return transferFlowFiles(context, session);
+ }
+ else {
+ return receiveFlowFiles(context, session);
+ }
+#endif
}
/**
@@ -136,11 +145,9 @@ class SiteToSiteClient : public core::Connectable {
virtual bool transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
std::map<std::string, std::string> attributes) = 0;
- void setPortId(uuid_t id) {
- uuid_copy(port_id_, id);
- char idStr[37];
- uuid_unparse_lower(id, idStr);
- port_id_str_ = idStr;
+ void setPortId(utils::Identifier &id) {
+ port_id_ = id;
+ port_id_str_ = port_id_.to_string();
}
/**
@@ -241,7 +248,7 @@ class SiteToSiteClient : public core::Connectable {
std::string port_id_str_;
// portId
- uuid_t port_id_;
+ utils::Identifier port_id_;
// Peer Connection
std::unique_ptr<SiteToSitePeer> peer_;
@@ -285,7 +292,7 @@ class WriteCallback : public OutputStreamCallback {
int len = _packet->_size;
int total = 0;
while (len > 0) {
- int size = std::min(len, 16384);
+ int size = len < 16384 ? len : 16384;
int ret = _packet->transaction_->getStream().readData(buffer, size);
if (ret != size) {
logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSiteFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index 4f0f36c..2d1dd74 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -58,7 +58,7 @@ static std::unique_ptr<SiteToSitePeer> createStreamingPeer(const SiteToSiteClien
* RawSiteToSiteClient will be instantiated and returned through a unique ptr.
*/
static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientConfiguration &client_configuration) {
- uuid_t uuid;
+ utils::Identifier uuid;
client_configuration.getPeer()->getPortId(uuid);
auto rsptr = createStreamingPeer(client_configuration);
if (nullptr == rsptr){
@@ -77,7 +77,7 @@ static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientC
* @returns site to site client or nullptr.
*/
static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConfiguration &client_configuration) {
- uuid_t uuid;
+ utils::Identifier uuid;
client_configuration.getPeer()->getPortId(uuid);
switch (client_configuration.getClientType()) {
case RAW:
@@ -90,8 +90,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf
auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort(),
client_configuration.getInterface()));
peer->setHTTPProxy(client_configuration.getHTTPProxy());
- char idStr[37];
- uuid_unparse_lower(uuid, idStr);
+
ptr->setPortId(uuid);
ptr->setPeer(std::move(peer));
return ptr;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/HTTPClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 553dcea..87bf659 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -198,7 +198,7 @@ class HTTPRequestResponse {
};
class BaseHTTPClient {
- public:
+public:
explicit BaseHTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) {
response_code = -1;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/Id.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index d9f0811..46537be 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -17,10 +17,11 @@
#ifndef LIBMINIFI_INCLUDE_UTILS_ID_H_
#define LIBMINIFI_INCLUDE_UTILS_ID_H_
+#include <cstddef>
#include <atomic>
#include <memory>
#include <string>
-#include <uuid/uuid.h>
+#include "uuid/uuid.h"
#include "core/logging/Logger.h"
#include "properties/Properties.h"
@@ -37,9 +38,91 @@ namespace nifi {
namespace minifi {
namespace utils {
+template<typename T, typename C>
+class IdentifierBase {
+ public:
+
+ IdentifierBase(T myid) {
+ copyInto(myid);
+ }
+
+ IdentifierBase(const IdentifierBase &other) {
+ copyInto(other.id_);
+ }
+
+ IdentifierBase() {
+ }
+
+ IdentifierBase &operator=(const IdentifierBase &other) {
+ copyInto(other.id_);
+ return *this;
+ }
+
+ IdentifierBase &operator=(T o) {
+ copyInto(o);
+ return *this;
+ }
+
+ void getIdentifier(T other) const {
+ copyOutOf(other);
+ }
+
+ C convert() const {
+ return converted_;
+ }
+
+ protected:
+
+ void copyInto(const IdentifierBase &other){
+ memcpy(id_, other.id_, sizeof(T));
+ }
+
+ void copyInto(const void *other) {
+ memcpy(id_, other, sizeof(T));
+ }
+
+ void copyOutOf(void *other) {
+ memcpy(other, id_, sizeof(T));
+ }
+
+ C converted_;
+
+ T id_;
+};
+
+class Identifier : public IdentifierBase<UUID_FIELD, std::string> {
+ public:
+ Identifier(UUID_FIELD u);
+ Identifier();
+ Identifier(const Identifier &other);
+ Identifier(const IdentifierBase &other);
+
+ Identifier &operator=(const IdentifierBase &other);
+ Identifier &operator=(const Identifier &other);
+ Identifier &operator=(UUID_FIELD o);
+
+ Identifier &operator=(std::string id);
+ bool operator==(std::nullptr_t nullp);
+
+ bool operator!=(std::nullptr_t nullp);
+
+ bool operator!=(const Identifier &other);
+ bool operator==(const Identifier &other);
+
+ std::string to_string();
+
+ unsigned char *toArray();
+
+ protected:
+
+ void build_string();
+
+};
+
class IdGenerator {
public:
- void generate(uuid_t output);
+ void generate(Identifier &output);
+ Identifier generate();
void initialize(const std::shared_ptr<Properties> & properties);
static std::shared_ptr<IdGenerator> getIdGenerator() {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/StringUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 7f33260..24adae1 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -17,6 +17,11 @@
#ifndef LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_
#define LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_
#include <iostream>
+#include <functional>
+#ifdef WIN32
+ #include <cwctype>
+ #include <cctype>
+#endif
#include <algorithm>
#include <sstream>
#include <vector>
@@ -68,7 +73,7 @@ class StringUtils {
* @returns modified string
*/
static inline std::string trimLeft(std::string s) {
- s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))));
+ s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(isspace))));
return s;
}
@@ -79,7 +84,7 @@ class StringUtils {
*/
static inline std::string trimRight(std::string s) {
- s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))).base(), s.end());
+ s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(isspace))).base(), s.end());
return s;
}
@@ -88,7 +93,7 @@ class StringUtils {
*/
static inline bool equalsIgnoreCase(const std::string &left, const std::string right) {
if (left.length() == right.length()) {
- return std::equal(right.begin(), right.end(), left.begin(), [](unsigned char lc, unsigned char rc) {return std::tolower(lc) == std::tolower(rc);});
+ return std::equal(right.begin(), right.end(), left.begin(), [](unsigned char lc, unsigned char rc) {return tolower(lc) == tolower(rc);});
} else {
return false;
}
@@ -200,7 +205,7 @@ class StringUtils {
inline static bool endsWithIgnoreCase(const std::string &value, const std::string & endString) {
if (endString.size() > value.size())
return false;
- return std::equal(endString.rbegin(), endString.rend(), value.rbegin(), [](unsigned char lc, unsigned char rc) {return std::tolower(lc) == std::tolower(rc);});
+ return std::equal(endString.rbegin(), endString.rend(), value.rbegin(), [](unsigned char lc, unsigned char rc) {return tolower(lc) == tolower(rc);});
}
inline static bool endsWith(const std::string &value, const std::string & endString) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/TimeUtil.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index 19c2566..0dc3b5c 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -18,9 +18,6 @@
#define __TIME_UTIL_H__
#include <time.h>
-#include <sys/time.h>
-#include <string.h>
-#include <unistd.h>
#include <string.h>
#include <iomanip>
#include <sstream>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/DiffUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/DiffUtils.h b/libminifi/include/utils/file/DiffUtils.h
index d44360d..313e0f6 100644
--- a/libminifi/include/utils/file/DiffUtils.h
+++ b/libminifi/include/utils/file/DiffUtils.h
@@ -24,10 +24,8 @@
#else
#include <cstdlib>
#include <sys/stat.h>
-#include <dirent.h>
#endif
#include <cstdio>
-#include <unistd.h>
#include <fcntl.h>
#ifdef WIN32
#define stat _stat
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/FileManager.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/FileManager.h b/libminifi/include/utils/file/FileManager.h
index f6a4312..247f249 100644
--- a/libminifi/include/utils/file/FileManager.h
+++ b/libminifi/include/utils/file/FileManager.h
@@ -23,7 +23,6 @@
#include <cstdlib>
#endif
#include <cstdio>
-#include <unistd.h>
#include <fcntl.h>
#include "io/validation.h"
#include "utils/Id.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/FileUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 33ca437..dd307ee 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -17,6 +17,9 @@
#ifndef LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_
#define LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_
+
+
+
#include <sstream>
#include <fstream>
#ifdef BOOST_VERSION
@@ -24,14 +27,30 @@
#else
#include <cstring>
#include <cstdlib>
+#ifdef WIN32
+#define WIN32_LEAN_AND_MEAN
+#include <WinSock2.h>
+#include <WS2tcpip.h>
+#include <Windows.h>
+#pragma comment(lib, "Ws2_32.lib")
+#else
#include <sys/stat.h>
#include <dirent.h>
#endif
+#endif
#include <cstdio>
+#ifndef WIN32
#include <unistd.h>
+#endif
#include <fcntl.h>
#ifdef WIN32
#define stat _stat
+#include <direct.h>
+#include <windows.h> // winapi
+#include <sys/stat.h> // stat
+#include <tchar.h> // _tcscpy,_tcscat,_tcscmp
+#include <string> // string
+#include <algorithm> // replace
#endif
namespace org {
@@ -72,6 +91,50 @@ class FileUtils {
//display error message
}
return 0;
+#elif defined(WIN32)
+ WIN32_FIND_DATA FindFileData;
+ HANDLE hFind;
+ DWORD Attributes;
+ std::string str;
+
+
+ std::stringstream pathstr;
+ pathstr << path << "\\.*";
+ str = pathstr.str();
+
+
+ //List files
+ hFind = FindFirstFile(str.c_str(), &FindFileData);
+ if (hFind != INVALID_HANDLE_VALUE)
+ {
+ do {
+ if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0)
+ {
+ //Str append Example
+
+ std::stringstream strs;
+ strs << path << "\\" << FindFileData.cFileName;
+ str = strs.str();
+
+ //_tprintf (TEXT("File Found: %s\n"),str);
+ Attributes = GetFileAttributes(str.c_str());
+ if (Attributes & FILE_ATTRIBUTE_DIRECTORY)
+ {
+ //is directory
+ delete_dir(str, delete_files_recursively);
+ }
+ else
+ {
+ remove(str.c_str());
+ //not directory
+ }
+ }
+ } while (FindNextFile(hFind, &FindFileData));
+ FindClose(hFind);
+
+ RemoveDirectory(path.c_str());
+ }
+ return 0;
#else
DIR *current_directory = opendir(path.c_str());
int r = -1;
@@ -136,11 +199,15 @@ class FileUtils {
#else
struct stat dir_stat;
if (stat(path.c_str(), &dir_stat)) {
- mkdir(path.c_str(), 0700);
+#ifdef WIN32
+ _mkdir(path.c_str());
+#else
+ mkdir(path.c_str(), 0700);
+#endif
+ return 0;
}
- return 0;
+ return -1;
#endif
- return -1;
}
static int copy_file(const std::string &path_from, const std::string dest_path) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/opsys/posix/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/opsys/posix/io/ClientSocket.h b/libminifi/opsys/posix/io/ClientSocket.h
new file mode 100644
index 0000000..4ed8b99
--- /dev/null
+++ b/libminifi/opsys/posix/io/ClientSocket.h
@@ -0,0 +1,296 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_
+
+
+#include <cstdint>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <mutex>
+#include <atomic>
+#include "io/BaseStream.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+#include "io/NetworkPrioritizer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Context class for socket. This is currently only used as a parent class for TLSContext. It is necessary so the Socket and TLSSocket constructors
+ * can be the same. It also gives us a common place to set timeouts, etc from the Configure object in the future.
+ */
+class SocketContext {
+ public:
+ SocketContext(const std::shared_ptr<Configure> &configure) {
+ }
+};
+/**
+ * Socket class.
+ * Purpose: Provides a general purpose socket interface that abstracts
+ * connecting information from users
+ * Design: Extends DataStream and allows us to perform most streaming
+ * operations against a BSD socket
+ *
+ *
+ */
+class Socket : public BaseStream {
+ public:
+ /**
+ * Constructor that creates a client socket.
+ * @param context the SocketContext
+ * @param hostname hostname we are connecting to.
+ * @param port port we are connecting to.
+ */
+ explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port);
+
+ /**
+ * Move constructor.
+ */
+ explicit Socket(const Socket &&);
+
+ /**
+ * Static function to return the current machine's host name
+ */
+ static std::string getMyHostName() {
+ static std::string HOSTNAME = init_hostname();
+ return HOSTNAME;
+ }
+
+ /**
+ * Destructor
+ */
+
+ virtual ~Socket();
+
+ virtual void closeStream();
+ /**
+ * Initializes the socket
+ * @return result of the creation operation.
+ */
+ virtual int16_t initialize();
+
+ virtual void setInterface(io::NetworkInterface &&interface) {
+ local_network_interface_ = std::move(interface);
+ }
+
+ /**
+ * Sets the non blocking flag on the file descriptor.
+ */
+ void setNonBlocking();
+
+ std::string getHostname() const;
+
+ /**
+ * Return the port for this socket
+ * @returns port
+ */
+ uint16_t getPort();
+
+ // data stream extensions
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+ return readData(buf, buflen, true);
+ }
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(uint8_t *buf, int buflen) {
+ return readData(buf, buflen, true);
+ }
+
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes);
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+
+ /**
+ * Write value to the stream using std::vector
+ * @param buf incoming buffer
+ * @param buflen buffer to write
+ *
+ */
+ virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+ /**
+ * writes value to stream
+ * @param value value to write
+ * @param size size of value
+ */
+ virtual int writeData(uint8_t *value, int size);
+
+ /**
+ * Writes a system word
+ * @param value value to write
+ */
+ virtual int write(uint64_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Writes a uint32_t
+ * @param value value to write
+ */
+ virtual int write(uint32_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Writes a system short
+ * @param value value to write
+ */
+ virtual int write(uint16_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a system word
+ * @param value value to write
+ */
+ virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a uint32_t
+ * @param value value to write
+ */
+ virtual int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a system short
+ * @param value value to write
+ */
+ virtual int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Returns the underlying buffer
+ * @return vector's array
+ **/
+ const uint8_t *getBuffer() const {
+ return DataStream::getBuffer();
+ }
+
+ /**
+ * Retrieve size of data stream
+ * @return size of data stream
+ **/
+ const uint64_t getSize() const {
+ return DataStream::getSize();
+ }
+
+ protected:
+
+ /**
+ * Constructor that accepts host name, port and listeners. With this
+ * contructor we will be creating a server socket
+ * @param context the SocketContext
+ * @param hostname our host name
+ * @param port connecting port
+ * @param listeners number of listeners in the queue
+ */
+ explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+ /**
+ * Creates a vector and returns the vector using the provided
+ * type name.
+ * @param t incoming object
+ * @returns vector.
+ */
+ template<typename T>
+ std::vector<uint8_t> readBuffer(const T&);
+
+ /**
+ * Creates a connection using the address info object.
+ * @param p addrinfo structure.
+ * @returns fd.
+ */
+ virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr);
+
+ /**
+ * Sets socket options depending on the instance.
+ * @param sock socket file descriptor.
+ */
+ virtual int16_t setSocketOptions(const int sock);
+
+ /**
+ * Attempt to select the socket file descriptor
+ * @param msec timeout interval to wait
+ * @returns file descriptor
+ */
+ virtual int16_t select_descriptor(const uint16_t msec);
+
+ addrinfo *addr_info_;
+
+ std::recursive_mutex selection_mutex_;
+
+ std::string requested_hostname_;
+ std::string canonical_hostname_;
+ uint16_t port_;
+
+ bool is_loopback_only_;
+ io::NetworkInterface local_network_interface_;
+
+ // connection information
+ int32_t socket_file_descriptor_;
+
+ fd_set total_list_;
+ fd_set read_fds_;
+ std::atomic<uint16_t> socket_max_;
+ std::atomic<uint64_t> total_written_;
+ std::atomic<uint64_t> total_read_;
+ uint16_t listeners_;
+
+
+ bool nonBlocking_;
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+ static std::string init_hostname() {
+ char hostname[1024];
+ gethostname(hostname, 1024);
+ Socket mySock(nullptr, hostname, 0);
+ mySock.initialize();
+ auto resolved_hostname = mySock.getHostname();
+ return !IsNullOrEmpty(resolved_hostname) ? resolved_hostname : hostname;
+ }
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/opsys/win/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/opsys/win/io/ClientSocket.h b/libminifi/opsys/win/io/ClientSocket.h
new file mode 100644
index 0000000..3621215
--- /dev/null
+++ b/libminifi/opsys/win/io/ClientSocket.h
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_
+#define LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_
+
+#include <cstdint>
+#include <sys/types.h>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+//#include <WinSock.h>
+//#include <WS2tcpip.h>
+#include <Windows.h>
+#include <WS2tcpip.h>
+#pragma comment(lib, "Ws2_32.lib")
+#else
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#endif
+#include <mutex>
+#include <atomic>
+#include "io/BaseStream.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "io/validation.h"
+#include "properties/Configure.h"
+#include "io/NetworkPrioritizer.h"
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+#ifdef WIN32
+ typedef SOCKET SocketDescriptor;
+#else
+ typedef int SocketDescriptor;
+ #define INVALID_SOCKET -1
+#endif
+
+
+/**
+ * Context class for socket. This is currently only used as a parent class for TLSContext. It is necessary so the Socket and TLSSocket constructors
+ * can be the same. It also gives us a common place to set timeouts, etc from the Configure object in the future.
+ */
+class SocketContext {
+ public:
+ SocketContext(const std::shared_ptr<Configure> &configure) {
+ }
+};
+/**
+ * Socket class.
+ * Purpose: Provides a general purpose socket interface that abstracts
+ * connecting information from users
+ * Design: Extends DataStream and allows us to perform most streaming
+ * operations against a BSD socket
+ *
+ *
+ */
+class Socket : public BaseStream {
+public:
+ /**
+ * Constructor that creates a client socket.
+ * @param context the SocketContext
+ * @param hostname hostname we are connecting to.
+ * @param port port we are connecting to.
+ */
+ explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port);
+
+ /**
+ * Move constructor.
+ */
+ explicit Socket(const Socket &&);
+
+ /**
+ * Static function to return the current machine's host name
+ */
+ static std::string getMyHostName() {
+ static std::string HOSTNAME = init_hostname();
+ return HOSTNAME;
+ }
+
+ /**
+ * Destructor
+ */
+
+ virtual ~Socket();
+
+ virtual void closeStream();
+ /**
+ * Initializes the socket
+ * @return result of the creation operation.
+ */
+ virtual int16_t initialize();
+
+ virtual void setInterface(io::NetworkInterface &&ifc) {
+ local_network_interface_ = std::move(ifc);
+ }
+
+ /**
+ * Sets the non blocking flag on the file descriptor.
+ */
+ void setNonBlocking();
+
+ std::string getHostname() const;
+
+ /**
+ * Return the port for this socket
+ * @returns port
+ */
+ uint16_t getPort();
+
+ // data stream extensions
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen) {
+ return readData(buf, buflen, true);
+ }
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(uint8_t *buf, int buflen) {
+ return readData(buf, buflen, true);
+ }
+
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes);
+ /**
+ * Reads data and places it into buf
+ * @param buf buffer in which we extract data
+ * @param buflen
+ * @param retrieve_all_bytes determines if we should read all bytes before returning
+ */
+ virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+
+ /**
+ * Write value to the stream using std::vector
+ * @param buf incoming buffer
+ * @param buflen buffer to write
+ *
+ */
+ virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+ /**
+ * writes value to stream
+ * @param value value to write
+ * @param size size of value
+ */
+ virtual int writeData(uint8_t *value, int size);
+
+ /**
+ * Writes a system word
+ * @param value value to write
+ */
+ virtual int write(uint64_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Writes a uint32_t
+ * @param value value to write
+ */
+ virtual int write(uint32_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Writes a system short
+ * @param value value to write
+ */
+ virtual int write(uint16_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a system word
+ * @param value value to write
+ */
+ virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a uint32_t
+ * @param value value to write
+ */
+ virtual int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Reads a system short
+ * @param value value to write
+ */
+ virtual int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+ /**
+ * Returns the underlying buffer
+ * @return vector's array
+ **/
+ const uint8_t *getBuffer() const {
+ return DataStream::getBuffer();
+ }
+
+ /**
+ * Retrieve size of data stream
+ * @return size of data stream
+ **/
+ const uint64_t getSize() const {
+ return DataStream::getSize();
+ }
+
+protected:
+
+ /**
+ * Constructor that accepts host name, port and listeners. With this
+ * contructor we will be creating a server socket
+ * @param context the SocketContext
+ * @param hostname our host name
+ * @param port connecting port
+ * @param listeners number of listeners in the queue
+ */
+ explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners);
+
+ /**
+ * Creates a vector and returns the vector using the provided
+ * type name.
+ * @param t incoming object
+ * @returns vector.
+ */
+ template<typename T>
+ std::vector<uint8_t> readBuffer(const T&);
+
+ /**
+ * Creates a connection using the address info object.
+ * @param p addrinfo structure.
+ * @returns fd.
+ */
+#ifdef WIN32
+ virtual int8_t createConnection(const addrinfo *p, struct in_addr &addr);
+#else
+ virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr);
+#endif
+
+ /**
+ * Sets socket options depending on the instance.
+ * @param sock socket file descriptor.
+ */
+ virtual int16_t setSocketOptions(const SocketDescriptor sock);
+
+ /**
+ * Attempt to select the socket file descriptor
+ * @param msec timeout interval to wait
+ * @returns file descriptor
+ */
+ virtual int16_t select_descriptor(const uint16_t msec);
+
+
+ addrinfo *addr_info_;
+
+ std::recursive_mutex selection_mutex_;
+
+ std::string requested_hostname_;
+ std::string canonical_hostname_;
+ uint16_t port_;
+
+ bool is_loopback_only_;
+ io::NetworkInterface local_network_interface_;
+
+ // connection information
+ SocketDescriptor socket_file_descriptor_;
+
+ fd_set total_list_;
+ fd_set read_fds_;
+ std::atomic<uint16_t> socket_max_;
+ std::atomic<uint64_t> total_written_;
+ std::atomic<uint64_t> total_read_;
+ uint16_t listeners_;
+
+ bool nonBlocking_;
+private:
+
+ class SocketInitializer
+ {
+ public:
+ SocketInitializer() {
+ #ifdef WIN32
+ static WSADATA s_wsaData;
+ int iWinSockInitResult = WSAStartup(MAKEWORD(2, 2), &s_wsaData);
+ #endif
+ }
+ ~SocketInitializer() {
+ #ifdef WIN32
+ WSACleanup();
+ #endif
+ }
+ };
+ static void initialize_socket() {
+ static SocketInitializer initialized;
+ }
+
+
+ std::shared_ptr<logging::Logger> logger_;
+
+
+ static std::string init_hostname() {
+ char hostname[1024];
+ gethostname(hostname, 1024);
+ Socket mySock(nullptr, hostname, 0);
+ mySock.initialize();
+ auto resolved_hostname = mySock.getHostname();
+ return !IsNullOrEmpty(resolved_hostname) ? resolved_hostname : hostname;
+ }
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 9513f68..f9a8fcb 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "Connection.h"
-#include <sys/time.h>
#include <time.h>
#include <vector>
#include <queue>
@@ -39,17 +38,64 @@ namespace apache {
namespace nifi {
namespace minifi {
-Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid, uuid_t srcUUID,
- uuid_t destUUID)
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name)
+ : core::Connectable(name),
+ flow_repository_(flow_repository),
+ content_repo_(content_repo),
+ logger_(logging::LoggerFactory<Connection>::getLogger()) {
+ source_connectable_ = nullptr;
+ dest_connectable_ = nullptr;
+ max_queue_size_ = 0;
+ max_data_queue_size_ = 0;
+ expired_duration_ = 0;
+ queued_data_size_ = 0;
+
+ logger_->log_debug("Connection %s created", name_);
+}
+
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid)
+ : core::Connectable(name, uuid),
+ flow_repository_(flow_repository),
+ content_repo_(content_repo),
+ logger_(logging::LoggerFactory<Connection>::getLogger()) {
+ source_connectable_ = nullptr;
+ dest_connectable_ = nullptr;
+ max_queue_size_ = 0;
+ max_data_queue_size_ = 0;
+ expired_duration_ = 0;
+ queued_data_size_ = 0;
+
+ logger_->log_debug("Connection %s created", name_);
+}
+
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid,
+ utils::Identifier & srcUUID)
+ : core::Connectable(name, uuid),
+ flow_repository_(flow_repository),
+ content_repo_(content_repo),
+ logger_(logging::LoggerFactory<Connection>::getLogger()) {
+
+ src_uuid_ = srcUUID;
+
+ source_connectable_ = nullptr;
+ dest_connectable_ = nullptr;
+ max_queue_size_ = 0;
+ max_data_queue_size_ = 0;
+ expired_duration_ = 0;
+ queued_data_size_ = 0;
+
+ logger_->log_debug("Connection %s created", name_);
+}
+
+Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid,
+ utils::Identifier & srcUUID, utils::Identifier & destUUID)
: core::Connectable(name, uuid),
flow_repository_(flow_repository),
content_repo_(content_repo),
logger_(logging::LoggerFactory<Connection>::getLogger()) {
- if (srcUUID)
- uuid_copy(src_uuid_, srcUUID);
- if (destUUID)
- uuid_copy(dest_uuid_, destUUID);
+ src_uuid_ = srcUUID;
+ dest_uuid_ = destUUID;
source_connectable_ = nullptr;
dest_connectable_ = nullptr;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 60122b0..60f701a 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -18,10 +18,8 @@
* limitations under the License.
*/
#include "FlowControlProtocol.h"
-#include <sys/time.h>
#include <stdio.h>
#include <time.h>
-#include <netinet/tcp.h>
#include <chrono>
#include <thread>
#include <string>
@@ -35,96 +33,12 @@ namespace nifi {
namespace minifi {
int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
- in_addr_t addr;
- int sock = 0;
- struct hostent *h;
-#ifdef __MACH__
- h = gethostbyname(host);
-#else
- char buf[1024];
- struct hostent he;
- int hh_errno;
- gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
- memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
- sock = socket(AF_INET, SOCK_STREAM, 0);
- if (sock < 0) {
- logger_->log_error("Could not create socket to hostName %s", host);
- return 0;
- }
-
-#ifndef __MACH__
- int opt = 1;
- bool nagle_off = true;
-
- if (nagle_off) {
- if (setsockopt(sock, SOL_TCP, TCP_NODELAY, reinterpret_cast<void*>(&opt), sizeof(opt)) < 0) {
- logger_->log_error("setsockopt() TCP_NODELAY failed");
- close(sock);
- return 0;
- }
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
- logger_->log_error("setsockopt() SO_REUSEADDR failed");
- close(sock);
- return 0;
- }
- }
-
- int sndsize = 256 * 1024;
- if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) {
- logger_->log_error("setsockopt() SO_SNDBUF failed");
- close(sock);
- return 0;
- }
-#endif
-
- struct sockaddr_in sa;
- socklen_t socklen;
- int status;
-
- memset(&sa, 0, sizeof(sa));
- sa.sin_family = AF_INET;
- sa.sin_addr.s_addr = htonl(INADDR_ANY);
- sa.sin_port = htons(0);
- socklen = sizeof(sa);
- if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) {
- logger_->log_error("socket bind failed");
- close(sock);
- return 0;
- }
-
- memset(&sa, 0, sizeof(sa));
- sa.sin_family = AF_INET;
- sa.sin_addr.s_addr = addr;
- sa.sin_port = htons(port);
- socklen = sizeof(sa);
-
- status = connect(sock, (struct sockaddr *) &sa, socklen);
-
- if (status < 0) {
- logger_->log_error("socket connect failed to %s %ll", host, port);
- close(sock);
- return 0;
- }
-
- logger_->log_debug("Flow Control Protocol socket %ll connect to server %s port %ll success", sock, host, port);
-
- return sock;
+ in_addr addr;
+ return 0;
}
int FlowControlProtocol::sendData(uint8_t *buf, int buflen) {
- int ret = 0, bytes = 0;
-
- while (bytes < buflen) {
- ret = send(_socket, buf + bytes, buflen - bytes, 0);
- // check for errors
- if (ret == -1) {
- return ret;
- }
- bytes += ret;
- }
-
- return bytes;
+ return 0;
}
int FlowControlProtocol::selectClient(int msec) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 12ce99f..c840f14 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -18,11 +18,9 @@
* limitations under the License.
*/
#include "FlowController.h"
-#include <sys/time.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
-#include <unistd.h>
#include <vector>
#include <queue>
#include <map>
@@ -56,6 +54,13 @@
#include "core/Connectable.h"
#include "utils/HTTPClient.h"
+
+#ifdef _MSC_VER
+#ifndef PATH_MAX
+#define PATH_MAX 260
+#endif
+#endif
+
namespace org {
namespace apache {
namespace nifi {
@@ -63,7 +68,7 @@ namespace minifi {
std::shared_ptr<utils::IdGenerator> FlowController::id_generator_ = utils::IdGenerator::getIdGenerator();
-#define DEFAULT_CONFIG_NAME "conf/flow.yml"
+#define DEFAULT_CONFIG_NAME "conf/config.yml"
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode)
@@ -133,8 +138,12 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
void FlowController::initializePaths(const std::string &adjustedFilename) {
char *path = NULL;
+#ifndef WIN32
char full_path[PATH_MAX];
path = realpath(adjustedFilename.c_str(), full_path);
+#else
+ path = const_cast<char*>(adjustedFilename.c_str());
+#endif
if (path == NULL) {
throw std::runtime_error("Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists");
@@ -144,7 +153,7 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
logger_->log_info("FlowController NiFi Configuration file %s", pathString);
if (!path) {
- logger_->log_error("Could not locate path from provided configuration file name (%s). Exiting.", full_path);
+ logger_->log_error("Could not locate path from provided configuration file name (%s). Exiting.", path);
exit(1);
}
}
@@ -540,7 +549,7 @@ void FlowController::loadC2ResponseConfiguration(const std::string &prefix) {
std::string name;
if (configuration_->get(nameOption.str(), name)) {
- std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr);
+ std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name);
if (configuration_->get(classOption.str(), class_definitions)) {
std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
@@ -601,7 +610,7 @@ std::shared_ptr<state::response::ResponseNode> FlowController::loadC2ResponseCon
std::string name;
if (configuration_->get(nameOption.str(), name)) {
- std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr);
+ std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name);
if (name.find(",") != std::string::npos) {
std::vector<std::string> sub_classes = utils::StringUtils::split(name, ",");
for (std::string subClassStr : classes) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 7815e70..8868c9a 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "FlowFileRecord.h"
-#include <sys/time.h>
#include <time.h>
#include <cstdio>
#include <vector>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index cb3b752..f5537fc 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -104,8 +104,12 @@ void Properties::loadConfigureFile(const char *fileName) {
}
}
char *path = NULL;
+#ifndef WIN32
char full_path[PATH_MAX];
path = realpath(adjustedFilename.c_str(), full_path);
+#else
+ path = const_cast<char*>(adjustedFilename.c_str());
+#endif
logger_->log_info("Using configuration file located at %s", path);
std::ifstream file(path, std::ifstream::in);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index e52a2c2..8bed630 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -20,8 +20,6 @@
#include "RemoteProcessorGroupPort.h"
-#include <curl/curl.h>
-#include <curl/easy.h>
#include <uuid/uuid.h>
#include <algorithm>
#include <cstdint>
@@ -133,8 +131,8 @@ void RemoteProcessorGroupPort::initialize() {
void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
std::string value;
- if (context->getProperty(portUUID.getName(), value)) {
- uuid_parse(value.c_str(), protocol_uuid_);
+ if (context->getProperty(portUUID.getName(), value) && !value.empty()) {
+ protocol_uuid_ = value;
}
std::string context_name;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 2514906..1ef8eea 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -17,7 +17,6 @@
*/
#include "c2/C2Agent.h"
-#include <unistd.h>
#include <csignal>
#include <utility>
#include <vector>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/c2/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 391ed81..d1e8b9a 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -33,6 +33,7 @@ namespace minifi {
namespace c2 {
const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) {
+#ifndef WIN32
rapidjson::Document root;
try {
@@ -126,6 +127,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const
}
} catch (...) {
}
+#endif
return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/capi/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/C2CallbackAgent.cpp b/libminifi/src/capi/C2CallbackAgent.cpp
index d8da5d6..dd04466 100644
--- a/libminifi/src/capi/C2CallbackAgent.cpp
+++ b/libminifi/src/capi/C2CallbackAgent.cpp
@@ -17,7 +17,6 @@
*/
#include "capi/C2CallbackAgent.h"
-#include <unistd.h>
#include <csignal>
#include <utility>
#include <vector>
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 6181382..595a01a 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -22,6 +22,8 @@
#include <set>
#include <string>
+std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
+
ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
: content_repo_(content_repo),
flow_repo_(flow_repo),
@@ -38,8 +40,8 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p
return nullptr;
}
- uuid_t uuid;
- uuid_generate(uuid);
+ utils::Identifier uuid;
+ id_generator_->generate(uuid);
processor->setStreamFactory(stream_factory);
// initialize the processor
@@ -66,7 +68,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p
connection->setSource(last);
connection->setDestination(processor);
- uuid_t uuid_copy, uuid_copy_next;
+ utils::Identifier uuid_copy, uuid_copy_next;
last->getUUID(uuid_copy);
connection->setSourceUUID(uuid_copy);
processor->getUUID(uuid_copy_next);
@@ -95,8 +97,8 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &
return nullptr;
}
- uuid_t uuid;
- uuid_generate(uuid);
+ utils::Identifier uuid;
+ id_generator_->generate(uuid);
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
if (nullptr == ptr) {
@@ -186,7 +188,7 @@ std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::sha
if (setDest)
connection->setDestination(processor);
- uuid_t uuid_copy;
+ utils::Identifier uuid_copy;
last->getUUID(uuid_copy);
connection->setSourceUUID(uuid_copy);
if (setDest)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/LinuxPowerManagementService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/LinuxPowerManagementService.cpp b/libminifi/src/controllers/LinuxPowerManagementService.cpp
index 836c9d3..a54cb7b 100644
--- a/libminifi/src/controllers/LinuxPowerManagementService.cpp
+++ b/libminifi/src/controllers/LinuxPowerManagementService.cpp
@@ -40,7 +40,7 @@ bool LinuxPowerManagerService::isAboveMax(int new_tasks) {
}
uint16_t LinuxPowerManagerService::getMaxThreads() {
- return std::numeric_limits<uint16_t>::max();
+ return (std::numeric_limits<uint16_t>::max)();
}
bool LinuxPowerManagerService::canIncrease() {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/NetworkPrioritizerService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/NetworkPrioritizerService.cpp b/libminifi/src/controllers/NetworkPrioritizerService.cpp
index 63210ec..d92758a 100644
--- a/libminifi/src/controllers/NetworkPrioritizerService.cpp
+++ b/libminifi/src/controllers/NetworkPrioritizerService.cpp
@@ -21,15 +21,18 @@
#include <limits>
#include <string>
#include <vector>
-#include <sys/ioctl.h>
+#ifndef WIN32
#include <ifaddrs.h>
#include <net/if.h>
+#include <sys/ioctl.h>
#include <netinet/in.h>
-#include <string.h>
#include <sys/socket.h>
#include <netdb.h>
-#include <stdlib.h>
#include <unistd.h>
+#endif
+#include <string.h>
+#include <stdlib.h>
+
#include <set>
#include "utils/StringUtils.h"
#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD))
@@ -112,6 +115,7 @@ std::string NetworkPrioritizerService::get_nearest_interface(const std::vector<s
}
bool NetworkPrioritizerService::interface_online(const std::string &ifc) {
+#ifndef WIN32
struct ifreq ifr;
auto sockid = socket(PF_INET6, SOCK_DGRAM, IPPROTO_IP);
memset(&ifr, 0, sizeof(ifr));
@@ -121,6 +125,9 @@ bool NetworkPrioritizerService::interface_online(const std::string &ifc) {
}
close(sockid);
return (ifr.ifr_flags & IFF_UP) && (ifr.ifr_flags & IFF_RUNNING);
+#else
+ return false;
+#endif
}
std::vector<std::string> NetworkPrioritizerService::getInterfaces(uint32_t size = 0) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index 352ba37..8d0a997 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -17,8 +17,11 @@
*/
#include "controllers/SSLContextService.h"
+
+#ifdef OPENSSL_SUPPORT
#include <openssl/err.h>
#include <openssl/ssl.h>
+#endif
#include <string>
#include <memory>
#include <set>
@@ -44,7 +47,13 @@ void SSLContextService::initialize() {
initialized_ = true;
}
+/**
+ * If OpenSSL is not installed we may still continue operations. Nullptr will
+ * be returned and it will be up to the caller to determien if this failure is
+ * recoverable.
+ */
std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
+#ifdef OPENSSL_SUPPORT
SSL_library_init();
const SSL_METHOD *method;
@@ -82,6 +91,9 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() {
logger_->log_error("Can not load CA certificate %s, Exiting, error : %s", ca_certificate_, std::strerror(errno));
}
return std::unique_ptr<SSLContext>(new SSLContext(ctx));
+#else
+ return nullptr;
+#endif
}
const std::string &SSLContextService::getCertificateFile() {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/UpdatePolicyControllerService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/UpdatePolicyControllerService.cpp b/libminifi/src/controllers/UpdatePolicyControllerService.cpp
index 08faad2..ae56fe1 100644
--- a/libminifi/src/controllers/UpdatePolicyControllerService.cpp
+++ b/libminifi/src/controllers/UpdatePolicyControllerService.cpp
@@ -21,15 +21,8 @@
#include <limits>
#include <string>
#include <vector>
-#include <sys/ioctl.h>
-#include <ifaddrs.h>
-#include <net/if.h>
-#include <netinet/in.h>
#include <string.h>
-#include <sys/socket.h>
-#include <netdb.h>
#include <stdlib.h>
-#include <unistd.h>
#include <set>
#include "utils/StringUtils.h"
#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD))
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp
index 5c2fdf8..9c1aa67 100644
--- a/libminifi/src/core/ClassLoader.cpp
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -15,10 +15,8 @@
* limitations under the License.
*/
-#include <sys/mman.h>
#include <memory>
#include <string>
-
#include "core/ClassLoader.h"
namespace org {
@@ -35,6 +33,7 @@ ClassLoader &ClassLoader::getDefaultClassLoader() {
// populate ret
return ret;
}
+
uint16_t ClassLoader::registerResource(const std::string &resource, const std::string &resourceFunction) {
void *resource_ptr = nullptr;
if (resource.empty()) {
@@ -57,7 +56,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource, const std::s
// load the symbols
createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(resource_ptr, resourceFunction.c_str()));
const char* dlsym_error = dlerror();
- if (dlsym_error) {
+ if ((dlsym_error != nullptr && strlen(dlsym_error) > 0) || create_factory_func == nullptr) {
return RESOURCE_FAILURE;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index ff6d1e5..c413ab9 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -191,6 +191,7 @@ bool ConfigurableComponent::createDynamicProperty(const std::string &name, const
}
Property new_property(name, DEFAULT_DYNAMIC_PROPERTY_DESC, value, false, "", {}, {});
+ new_property.setSupportsExpressionLanguage(true);
logger_->log_info("Processor %s dynamic property '%s' value '%s'", name.c_str(), new_property.getName().c_str(), value.c_str());
dynamic_properties_[new_property.getName()] = new_property;
onDynamicPropertyModified({}, new_property);
@@ -205,6 +206,7 @@ bool ConfigurableComponent::setDynamicProperty(const std::string name, std::stri
Property &orig_property = it->second;
Property new_property = orig_property;
new_property.setValue(value);
+ new_property.setSupportsExpressionLanguage(true);
dynamic_properties_[new_property.getName()] = new_property;
onDynamicPropertyModified(orig_property, new_property);
logger_->log_debug("Component %s dynamic property name %s value %s", name, new_property.getName(), value);
@@ -222,6 +224,7 @@ bool ConfigurableComponent::updateDynamicProperty(const std::string &name, const
Property &orig_property = it->second;
Property new_property = orig_property;
new_property.addValue(value);
+ new_property.setSupportsExpressionLanguage(true);
dynamic_properties_[new_property.getName()] = new_property;
onDynamicPropertyModified(orig_property, new_property);
logger_->log_debug("Component %s dynamic property name %s value %s", name, new_property.getName(), value);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index 2a98818..a5fb1b2 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -30,13 +30,21 @@ namespace nifi {
namespace minifi {
namespace core {
-Connectable::Connectable(std::string name, uuid_t uuid)
+Connectable::Connectable(std::string name, utils::Identifier &uuid)
: CoreComponent(name, uuid),
max_concurrent_tasks_(1),
connectable_version_(nullptr),
logger_(logging::LoggerFactory<Connectable>::getLogger()) {
}
+Connectable::Connectable(std::string name)
+ : CoreComponent(name),
+ max_concurrent_tasks_(1),
+ connectable_version_(nullptr),
+ logger_(logging::LoggerFactory<Connectable>::getLogger()) {
+}
+
+
Connectable::Connectable(const Connectable &&other)
: CoreComponent(std::move(other)),
max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)),
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Core.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index da9462b..e5c8d6c 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -29,31 +29,29 @@ namespace core {
std::shared_ptr<utils::IdGenerator> CoreComponent::id_generator_ = utils::IdGenerator::getIdGenerator();
// Set UUID
-void CoreComponent::setUUID(uuid_t uuid) {
- uuid_copy(uuid_, uuid);
- char uuidStr[37];
- uuid_unparse_lower(uuid_, uuidStr);
- uuidStr_ = uuidStr;
+void CoreComponent::setUUID(utils::Identifier &uuid) {
+ uuid_ = uuid;
+ uuidStr_ = uuid_.to_string();
}
void CoreComponent::setUUIDStr(const std::string uuidStr) {
- uuid_parse(uuidStr.c_str(), uuid_);
+ uuid_ = uuidStr;
uuidStr_ = uuidStr;
}
// Get UUID
-bool CoreComponent::getUUID(uuid_t uuid) {
- if (uuid) {
- uuid_copy(uuid, uuid_);
- return true;
- } else {
+bool CoreComponent::getUUID(utils::Identifier &uuid) {
+ if (uuid_ == nullptr) {
return false;
}
+ uuid = uuid_;
+ return true;
}
// Get UUID
-unsigned const char *CoreComponent::getUUID() {
- return uuid_;
-}
+/*
+ unsigned const char *CoreComponent::getUUID() {
+ return uuid_.getIdentifier();
+ }*/
// Set Processor Name
void CoreComponent::setName(const std::string name) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 320797b..aea6d62 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -28,13 +28,15 @@ namespace nifi {
namespace minifi {
namespace core {
-std::vector<std::string> FlowConfiguration::statics_sl_funcs_;
-std::mutex FlowConfiguration::atomic_initialization_;
+static_initializers &get_static_functions() {
+ static static_initializers static_sl_funcs;
+ return static_sl_funcs;
+}
FlowConfiguration::~FlowConfiguration() {
}
-std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) {
+std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier & uuid) {
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid);
if (nullptr == ptr) {
logger_->log_error("No Processor defined for %s", name);
@@ -60,47 +62,47 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask()
}
std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const std::string &source, const std::string &yamlConfigPayload) {
- if (!source.empty()) {
- std::string host, protocol, path, query, url = source;
- int port;
- utils::parse_url(&url, &host, &port, &protocol, &path, &query);
-
- std::string flow_id, bucket_id;
- auto path_split = utils::StringUtils::split(path, "/");
- for (size_t i = 0; i < path_split.size(); i++) {
- const std::string &str = path_split.at(i);
- if (str == "flows") {
- if (i + 1 < path_split.size()) {
- flow_id = path_split.at(i + 1);
- i++;
- }
+ if (!source.empty()) {
+ std::string host, protocol, path, query, url = source;
+ int port;
+ utils::parse_url(&url, &host, &port, &protocol, &path, &query);
+
+ std::string flow_id, bucket_id;
+ auto path_split = utils::StringUtils::split(path, "/");
+ for (size_t i = 0; i < path_split.size(); i++) {
+ const std::string &str = path_split.at(i);
+ if (str == "flows") {
+ if (i + 1 < path_split.size()) {
+ flow_id = path_split.at(i + 1);
+ i++;
}
+ }
- if (str == "bucket") {
- if (i + 1 < path_split.size()) {
- bucket_id = path_split.at(i + 1);
- i++;
- }
+ if (str == "bucket") {
+ if (i + 1 < path_split.size()) {
+ bucket_id = path_split.at(i + 1);
+ i++;
}
}
- flow_version_->setFlowVersion(url, bucket_id, flow_id);
}
- return getRootFromPayload(yamlConfigPayload);
+ flow_version_->setFlowVersion(url, bucket_id, flow_id);
}
+ return getRootFromPayload(yamlConfigPayload);
+}
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) {
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, utils::Identifier & uuid, int version) {
return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
}
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, utils::Identifier & uuid) {
return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
}
-std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) {
+std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, utils::Identifier & uuid) {
return std::make_shared<minifi::Connection>(flow_file_repo_, content_repo_, name, uuid);
}
-std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) {
+std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, utils::Identifier & uuid) {
std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name, true);
if (nullptr != controllerServicesNode)
controllerServicesNode->setUUID(uuid);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 3427559..d41d9b0 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -33,7 +33,7 @@ std::shared_ptr<utils::IdGenerator> FlowFile::id_generator_ = utils::IdGenerator
std::shared_ptr<logging::Logger> FlowFile::logger_ = logging::LoggerFactory<FlowFile>::getLogger();
FlowFile::FlowFile()
- : Connectable("FlowFile", 0),
+ : Connectable("FlowFile"),
size_(0),
id_(0),
stored(false),
@@ -54,7 +54,7 @@ FlowFile::~FlowFile() {
}
FlowFile& FlowFile::operator=(const FlowFile& other) {
- uuid_copy(uuid_, other.uuid_);
+ uuid_ = other.uuid_;
stored = other.stored;
marked_delete_ = other.marked_delete_;
entry_date_ = other.entry_date_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 205f9f2..7df1b09 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "core/ProcessGroup.h"
-#include <sys/time.h>
#include <time.h>
#include <vector>
#include <memory>
@@ -39,17 +38,40 @@ namespace core {
std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent)
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid)
+ : ProcessGroup(type, name, uuid, 0, 0) {
+}
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version)
+ : ProcessGroup(type, name, uuid, version, 0) {
+}
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version, ProcessGroup *parent)
: logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
name_(name),
type_(type),
config_version_(version),
parent_process_group_(parent) {
- if (!uuid)
- // Generate the global UUID for the flow record
+ if (uuid == nullptr) {
id_generator_->generate(uuid_);
- else
- uuid_copy(uuid_, uuid);
+ } else {
+ uuid_ = uuid;
+ }
+
+ yield_period_msec_ = 0;
+ transmitting_ = false;
+ transport_protocol_ = "RAW";
+
+ logger_->log_debug("ProcessGroup %s created", name_);
+}
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name)
+ : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
+ name_(name),
+ type_(type),
+ config_version_(0),
+ parent_process_group_(0) {
+ id_generator_->generate(uuid_);
yield_period_msec_ = 0;
transmitting_ = false;
@@ -168,20 +190,15 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve
}
}
-std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
+std::shared_ptr<Processor> ProcessGroup::findProcessor(utils::Identifier &uuid) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("find processor %s", processor->getName());
- uuid_t processorUUID;
+ utils::Identifier processorUUID;
if (processor->getUUID(processorUUID)) {
- char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
- uuid_unparse_lower(processorUUID, uuid_str);
- std::string processorUUIDstr = uuid_str;
- uuid_unparse_lower(uuid, uuid_str);
- std::string uuidStr = uuid_str;
- if (processorUUIDstr == uuidStr) {
+ if (uuid == processorUUID) {
return processor;
}
}
@@ -277,14 +294,14 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
// We do not have the same connection in this process group yet
connections_.insert(connection);
logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
- uuid_t sourceUUID;
+ utils::Identifier sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->addConnection(connection);
std::shared_ptr<Processor> destination = NULL;
- uuid_t destinationUUID;
+ utils::Identifier destinationUUID;
connection->getDestinationUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
@@ -299,14 +316,14 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
// We do not have the same connection in this process group yet
connections_.erase(connection);
logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_);
- uuid_t sourceUUID;
+ utils::Identifier sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->removeConnection(connection);
std::shared_ptr<Processor> destination = NULL;
- uuid_t destinationUUID;
+ utils::Identifier destinationUUID;
connection->getDestinationUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1bfa9f8..dc45446 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -19,7 +19,6 @@
*/
#include "core/ProcessSession.h"
#include "core/ProcessSessionReadCallback.h"
-#include <sys/time.h>
#include <time.h>
#include <vector>
#include <queue>
@@ -31,6 +30,25 @@
#include <thread>
#include <iostream>
#include <uuid/uuid.h>
+/* This implementation is only for native Windows systems. */
+#if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__
+#define _WINSOCKAPI_
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <WinSock2.h>
+#include <WS2tcpip.h>
+#include <Windows.h>
+#pragma comment(lib, "Ws2_32.lib")
+#include <direct.h>
+
+int getpagesize(void) {
+ return 4096;
+ // SYSTEM_INFO system_info;
+ // GetSystemInfo(&system_info);
+ // return system_info.dwPageSize;
+}
+#endif
namespace org {
namespace apache {
@@ -619,12 +637,10 @@ bool ProcessSession::exportContent(const std::string &destination, const std::st
}
bool ProcessSession::exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) {
- char tmpFileUuidStr[37];
- uuid_t tmpFileUuid;
+ utils::Identifier tmpFileUuid;
id_generator_->generate(tmpFileUuid);
- uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr);
std::stringstream tmpFileSs;
- tmpFileSs << destination << "." << tmpFileUuidStr;
+ tmpFileSs << destination << "." << tmpFileUuid.to_string();
std::string tmpFileName = tmpFileSs.str();
return exportContent(destination, tmpFileName, flow, keepContent);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 7c0fe25..480ae58 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "core/Processor.h"
-#include <sys/time.h>
#include <time.h>
#include <vector>
#include <queue>
@@ -45,8 +44,8 @@ namespace nifi {
namespace minifi {
namespace core {
-Processor::Processor(std::string name, uuid_t uuid)
- : Connectable(name, uuid),
+Processor::Processor(std::string name)
+ : Connectable(name),
ConfigurableComponent(),
logger_(logging::LoggerFactory<Processor>::getLogger()) {
has_work_.store(false);
@@ -66,6 +65,27 @@ Processor::Processor(std::string name, uuid_t uuid)
logger_->log_debug("Processor %s created UUID %s", name_, uuidStr_);
}
+Processor::Processor(std::string name, utils::Identifier &uuid)
+ : Connectable(name, uuid),
+ ConfigurableComponent(),
+ logger_(logging::LoggerFactory<Processor>::getLogger()) {
+ has_work_.store(false);
+ // Setup the default values
+ state_ = DISABLED;
+ strategy_ = TIMER_DRIVEN;
+ loss_tolerant_ = false;
+ _triggerWhenEmpty = false;
+ scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
+ run_duration_nano_ = DEFAULT_RUN_DURATION;
+ yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+ _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+ max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
+ active_tasks_ = 0;
+ yield_expiration_ = 0;
+ incoming_connections_Iter = this->_incomingConnections.begin();
+ logger_->log_debug("Processor %s created UUID %s with uuid %s", name_, uuidStr_, uuid.to_string());
+}
+
bool Processor::isRunning() {
return (state_ == RUNNING && active_tasks_ > 0);
}
@@ -87,17 +107,13 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
std::lock_guard<std::mutex> lock(mutex_);
- uuid_t srcUUID;
- uuid_t destUUID;
+ utils::Identifier srcUUID;
+ utils::Identifier destUUID;
connection->getSourceUUID(srcUUID);
connection->getDestinationUUID(destUUID);
- char uuid_str[37];
-
- uuid_unparse_lower(uuid_, uuid_str);
- std::string my_uuid = uuid_str;
- uuid_unparse_lower(destUUID, uuid_str);
- std::string destination_uuid = uuid_str;
+ std::string my_uuid = uuid_.to_string();
+ std::string destination_uuid = destUUID.to_string();
if (my_uuid == destination_uuid) {
// Connection is destination to the current processor
if (_incomingConnections.find(connection) == _incomingConnections.end()) {
@@ -108,8 +124,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
ret = true;
}
}
- uuid_unparse_lower(srcUUID, uuid_str);
- std::string source_uuid = uuid_str;
+ std::string source_uuid = srcUUID.to_string();
if (my_uuid == source_uuid) {
std::string relationship = connection->getRelationship().getName();
// Connection is source from the current processor
@@ -147,15 +162,15 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
std::lock_guard<std::mutex> lock(mutex_);
- uuid_t srcUUID;
- uuid_t destUUID;
+ utils::Identifier srcUUID;
+ utils::Identifier destUUID;
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
connection->getSourceUUID(srcUUID);
connection->getDestinationUUID(destUUID);
- if (uuid_compare(uuid_, destUUID) == 0) {
+ if (uuid_ == destUUID) {
// Connection is destination to the current processor
if (_incomingConnections.find(connection) != _incomingConnections.end()) {
_incomingConnections.erase(connection);
@@ -165,7 +180,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
}
}
- if (uuid_compare(uuid_, srcUUID) == 0) {
+ if (uuid_ == srcUUID) {
std::string relationship = connection->getRelationship().getName();
// Connection is source from the current processor
auto &&it = out_going_connections_.find(relationship);