You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/03/28 17:19:18 UTC
[13/16] nifi-minifi-cpp git commit: MINIFI-217: Updates namespaces
and removes use of raw pointers for user facing API.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h
index e89bb74..de3a42f 100644
--- a/libminifi/include/Site2SitePeer.h
+++ b/libminifi/include/Site2SitePeer.h
@@ -29,242 +29,261 @@
#include <mutex>
#include <atomic>
#include <memory>
-#include "Logger.h"
-#include "Configure.h"
-#include "Property.h"
+
+#include "core/Property.h"
+#include "core/logging/Logger.h"
+#include "properties/Configure.h"
#include "io/ClientSocket.h"
#include "io/BaseStream.h"
#include "utils/TimeUtil.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
static const char MAGIC_BYTES[] = { 'N', 'i', 'F', 'i' };
-//! Site2SitePeer Class
-class Site2SitePeer : public BaseStream{
-public:
-
+// Site2SitePeer Class
+class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream {
+ public:
+
+ Site2SitePeer()
+ : stream_(nullptr),
+ host_(""),
+ port_(-1) {
+
+ }
+ /*
+ * Create a new site2site peer
+ */
+ explicit Site2SitePeer(
+ std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket,
+ const std::string host_, uint16_t port_)
+ : host_(host_),
+ port_(port_),
+ stream_(injected_socket.release()) {
+ logger_ = logging::Logger::getLogger();
+ configure_ = Configure::getConfigure();
+ _yieldExpiration = 0;
+ _timeOut = 30000; // 30 seconds
+ _url = "nifi://" + host_ + ":" + std::to_string(port_);
+ }
- Site2SitePeer() : stream_(nullptr),host_(""),port_(-1){
-
- }
- /*
- * Create a new site2site peer
- */
- explicit Site2SitePeer(std::unique_ptr<DataStream> injected_socket, const std::string host_, uint16_t port_ ) :
- host_(host_), port_(port_), stream_(injected_socket.release()){
- logger_ = Logger::getLogger();
- configure_ = Configure::getConfigure();
- _yieldExpiration = 0;
- _timeOut = 30000; // 30 seconds
- _url = "nifi://" + host_ + ":" + std::to_string(port_);
- }
-
- explicit Site2SitePeer(Site2SitePeer &&ss) : stream_( ss.stream_.release()), host_( std::move(ss.host_) ), port_ (std::move(ss.port_) )
- {
- logger_ = Logger::getLogger();
- configure_ = Configure::getConfigure();
- _yieldExpiration.store(ss._yieldExpiration);
- _timeOut.store(ss._timeOut);
- _url = std::move(ss._url);
- }
- //! Destructor
- virtual ~Site2SitePeer() {
- Close();
- }
- //! Set Processor yield period in MilliSecond
- void setYieldPeriodMsec(uint64_t period) {
- _yieldPeriodMsec = period;
- }
- //! get URL
- std::string getURL() {
- return _url;
- }
- //! Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec(void) {
- return (_yieldPeriodMsec);
- }
- //! Yield based on the yield period
- void yield() {
- _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
- }
- //! setHostName
- void setHostName(std::string host_) {
- this->host_ = host_;
- _url = "nifi://" + host_ + ":" + std::to_string(port_);
- }
- //! setPort
- void setPort(uint16_t port_) {
- this->port_ = port_;
- _url = "nifi://" + host_ + ":" + std::to_string(port_);
- }
- //! getHostName
- std::string getHostName() {
- return host_;
- }
- //! getPort
- uint16_t getPort() {
- return port_;
- }
- //! Yield based on the input time
- void yield(uint64_t time) {
- _yieldExpiration = (getTimeMillis() + time);
- }
- //! whether need be to yield
- bool isYield() {
- if (_yieldExpiration > 0)
- return (_yieldExpiration >= getTimeMillis());
- else
- return false;
- }
- // clear yield expiration
- void clearYield() {
- _yieldExpiration = 0;
- }
- //! Yield based on the yield period
- void yield(std::string portId) {
- std::lock_guard<std::mutex> lock(_mtx);
- uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
- _yieldExpirationPortIdMap[portId] = yieldExpiration;
- }
- //! Yield based on the input time
- void yield(std::string portId, uint64_t time) {
- std::lock_guard<std::mutex> lock(_mtx);
- uint64_t yieldExpiration = (getTimeMillis() + time);
- _yieldExpirationPortIdMap[portId] = yieldExpiration;
- }
- //! whether need be to yield
- bool isYield(std::string portId) {
- std::lock_guard<std::mutex> lock(_mtx);
- std::map<std::string, uint64_t>::iterator it =
- this->_yieldExpirationPortIdMap.find(portId);
- if (it != _yieldExpirationPortIdMap.end()) {
- uint64_t yieldExpiration = it->second;
- return (yieldExpiration >= getTimeMillis());
- } else {
- return false;
- }
- }
- //! clear yield expiration
- void clearYield(std::string portId) {
- std::lock_guard<std::mutex> lock(_mtx);
- std::map<std::string, uint64_t>::iterator it =
- this->_yieldExpirationPortIdMap.find(portId);
- if (it != _yieldExpirationPortIdMap.end()) {
- _yieldExpirationPortIdMap.erase(portId);
- }
- }
- //! setTimeOut
- void setTimeOut(uint64_t time) {
- _timeOut = time;
- }
- //! getTimeOut
- uint64_t getTimeOut() {
- return _timeOut;
- }
- int write(uint8_t value) {
- return Serializable::write(value,stream_.get());
- }
- int write(char value) {
- return Serializable::write(value,stream_.get());
- }
- int write(uint32_t value) {
+ explicit Site2SitePeer(Site2SitePeer &&ss)
+ : stream_(ss.stream_.release()),
+ host_(std::move(ss.host_)),
+ port_(std::move(ss.port_)) {
+ logger_ = logging::Logger::getLogger();
+ configure_ = Configure::getConfigure();
+ _yieldExpiration.store(ss._yieldExpiration);
+ _timeOut.store(ss._timeOut);
+ _url = std::move(ss._url);
+ }
+ // Destructor
+ virtual ~Site2SitePeer() {
+ Close();
+ }
+ // Set Processor yield period in MilliSecond
+ void setYieldPeriodMsec(uint64_t period) {
+ _yieldPeriodMsec = period;
+ }
+ // get URL
+ std::string getURL() {
+ return _url;
+ }
+ // Get Processor yield period in MilliSecond
+ uint64_t getYieldPeriodMsec(void) {
+ return (_yieldPeriodMsec);
+ }
+ // Yield based on the yield period
+ void yield() {
+ _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+ }
+ // setHostName
+ void setHostName(std::string host_) {
+ this->host_ = host_;
+ _url = "nifi://" + host_ + ":" + std::to_string(port_);
+ }
+ // setPort
+ void setPort(uint16_t port_) {
+ this->port_ = port_;
+ _url = "nifi://" + host_ + ":" + std::to_string(port_);
+ }
+ // getHostName
+ std::string getHostName() {
+ return host_;
+ }
+ // getPort
+ uint16_t getPort() {
+ return port_;
+ }
+ // Yield based on the input time
+ void yield(uint64_t time) {
+ _yieldExpiration = (getTimeMillis() + time);
+ }
+ // whether need be to yield
+ bool isYield() {
+ if (_yieldExpiration > 0)
+ return (_yieldExpiration >= getTimeMillis());
+ else
+ return false;
+ }
+ // clear yield expiration
+ void clearYield() {
+ _yieldExpiration = 0;
+ }
+ // Yield based on the yield period
+ void yield(std::string portId) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+ _yieldExpirationPortIdMap[portId] = yieldExpiration;
+ }
+ // Yield based on the input time
+ void yield(std::string portId, uint64_t time) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ uint64_t yieldExpiration = (getTimeMillis() + time);
+ _yieldExpirationPortIdMap[portId] = yieldExpiration;
+ }
+ // whether need be to yield
+ bool isYield(std::string portId) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::map<std::string, uint64_t>::iterator it = this
+ ->_yieldExpirationPortIdMap.find(portId);
+ if (it != _yieldExpirationPortIdMap.end()) {
+ uint64_t yieldExpiration = it->second;
+ return (yieldExpiration >= getTimeMillis());
+ } else {
+ return false;
+ }
+ }
+ // clear yield expiration
+ void clearYield(std::string portId) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::map<std::string, uint64_t>::iterator it = this
+ ->_yieldExpirationPortIdMap.find(portId);
+ if (it != _yieldExpirationPortIdMap.end()) {
+ _yieldExpirationPortIdMap.erase(portId);
+ }
+ }
+ // setTimeOut
+ void setTimeOut(uint64_t time) {
+ _timeOut = time;
+ }
+ // getTimeOut
+ uint64_t getTimeOut() {
+ return _timeOut;
+ }
+ int write(uint8_t value) {
+ return Serializable::write(value, stream_.get());
+ }
+ int write(char value) {
+ return Serializable::write(value, stream_.get());
+ }
+ int write(uint32_t value) {
- return Serializable::write(value,stream_.get());
+ return Serializable::write(value, stream_.get());
- }
- int write(uint16_t value) {
- return Serializable::write(value,stream_.get());
- }
- int write(uint8_t *value, int len) {
- return Serializable::write(value,len,stream_.get());
- }
- int write(uint64_t value) {
- return Serializable::write(value,stream_.get());
- }
- int write(bool value) {
- uint8_t temp = value;
- return Serializable::write(temp,stream_.get());
- }
- int writeUTF(std::string str, bool widen = false){
- return Serializable::writeUTF(str,stream_.get(),widen);
- }
- int read(uint8_t &value) {
- return Serializable::read(value,stream_.get());
- }
- int read(uint16_t &value) {
- return Serializable::read(value,stream_.get());
- }
- int read(char &value) {
- return Serializable::read(value,stream_.get());
- }
- int read(uint8_t *value, int len) {
- return Serializable::read(value,len,stream_.get());
- }
- int read(uint32_t &value) {
- return Serializable::read(value,stream_.get());
- }
- int read(uint64_t &value) {
- return Serializable::read(value,stream_.get());
- }
- int readUTF(std::string &str, bool widen = false)
- {
- return Serializable::readUTF(str,stream_.get(),widen);
- }
- //! open connection to the peer
- bool Open();
- //! close connection to the peer
- void Close();
-
- /**
- * Move assignment operator.
- */
- Site2SitePeer& operator=(Site2SitePeer&& other)
- {
- stream_ = std::unique_ptr<DataStream>( other.stream_.release());
- host_ = std::move(other.host_);
- port_ = std::move(other.port_);
- logger_ = Logger::getLogger();
- configure_ = Configure::getConfigure();
- _yieldExpiration = 0;
- _timeOut = 30000; // 30 seconds
- _url = "nifi://" + host_ + ":" + std::to_string(port_);
-
- return *this;
- }
+ }
+ int write(uint16_t value) {
+ return Serializable::write(value, stream_.get());
+ }
+ int write(uint8_t *value, int len) {
+ return Serializable::write(value, len, stream_.get());
+ }
+ int write(uint64_t value) {
+ return Serializable::write(value, stream_.get());
+ }
+ int write(bool value) {
+ uint8_t temp = value;
+ return Serializable::write(temp, stream_.get());
+ }
+ int writeUTF(std::string str, bool widen = false) {
+ return Serializable::writeUTF(str, stream_.get(), widen);
+ }
+ int read(uint8_t &value) {
+ return Serializable::read(value, stream_.get());
+ }
+ int read(uint16_t &value) {
+ return Serializable::read(value, stream_.get());
+ }
+ int read(char &value) {
+ return Serializable::read(value, stream_.get());
+ }
+ int read(uint8_t *value, int len) {
+ return Serializable::read(value, len, stream_.get());
+ }
+ int read(uint32_t &value) {
+ return Serializable::read(value, stream_.get());
+ }
+ int read(uint64_t &value) {
+ return Serializable::read(value, stream_.get());
+ }
+ int readUTF(std::string &str, bool widen = false) {
+ return org::apache::nifi::minifi::io::Serializable::readUTF(str,
+ stream_.get(),
+ widen);
+ }
+ // open connection to the peer
+ bool Open();
+ // close connection to the peer
+ void Close();
- Site2SitePeer(const Site2SitePeer &parent) = delete;
- Site2SitePeer &operator=(const Site2SitePeer &parent) = delete;
+ /**
+ * Move assignment operator.
+ */
+ Site2SitePeer& operator=(Site2SitePeer&& other) {
+ stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
+ other.stream_.release());
+ host_ = std::move(other.host_);
+ port_ = std::move(other.port_);
+ logger_ = logging::Logger::getLogger();
+ configure_ = Configure::getConfigure();
+ _yieldExpiration = 0;
+ _timeOut = 30000; // 30 seconds
+ _url = "nifi://" + host_ + ":" + std::to_string(port_);
-protected:
+ return *this;
+ }
-private:
+ Site2SitePeer(const Site2SitePeer &parent) = delete;
+ Site2SitePeer &operator=(const Site2SitePeer &parent) = delete;
- std::unique_ptr<DataStream> stream_;
+ protected:
- std::string host_;
- uint16_t port_;
+ private:
- //! Mutex for protection
- std::mutex _mtx;
- //! URL
- std::string _url;
- //! socket timeout;
- std::atomic<uint64_t> _timeOut;
- //! Logger
- std::shared_ptr<Logger> logger_;
- //! Configure
- Configure *configure_;
- //! Yield Period in Milliseconds
- std::atomic<uint64_t> _yieldPeriodMsec;
- //! Yield Expiration
- std::atomic<uint64_t> _yieldExpiration;
- //! Yield Expiration per destination PortID
- std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
- //! OpenSSL connection state
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
+ std::unique_ptr<org::apache::nifi::minifi::io::DataStream> stream_;
+
+ std::string host_;
+ uint16_t port_;
+
+ // Mutex for protection
+ std::mutex mutex_;
+ // URL
+ std::string _url;
+ // socket timeout;
+ std::atomic<uint64_t> _timeOut;
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Configure
+ Configure *configure_;
+ // Yield Period in Milliseconds
+ std::atomic<uint64_t> _yieldPeriodMsec;
+ // Yield Expiration
+ std::atomic<uint64_t> _yieldExpiration;
+ // Yield Expiration per destination PortID
+ std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
+ // OpenSSL connection state
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
};
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TailFile.h b/libminifi/include/TailFile.h
deleted file mode 100644
index d68748e..0000000
--- a/libminifi/include/TailFile.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * @file TailFile.h
- * TailFile class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TAIL_FILE_H__
-#define __TAIL_FILE_H__
-
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-
-//! TailFile Class
-class TailFile : public Processor
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- TailFile(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid)
- {
- logger_ = Logger::getLogger();
- _stateRecovered = false;
- }
- //! Destructor
- virtual ~TailFile()
- {
- storeState();
- }
- //! Processor Name
- static const std::string ProcessorName;
- //! Supported Properties
- static Property FileName;
- static Property StateFile;
- //! Supported Relationships
- static Relationship Success;
-
-public:
- //! OnTrigger method, implemented by NiFi TailFile
- virtual void onTrigger(ProcessContext *context, ProcessSession *session);
- //! Initialize, over write by NiFi TailFile
- virtual void initialize(void);
- //! recoverState
- void recoverState();
- //! storeState
- void storeState();
-
-protected:
-
-private:
- //! Logger
- std::shared_ptr<Logger> logger_;
- std::string _fileLocation;
- //! Property Specified Tailed File Name
- std::string _fileName;
- //! File to save state
- std::string _stateFile;
- //! State related to the tailed file
- std::string _currentTailFileName;
- uint64_t _currentTailFilePosition;
- bool _stateRecovered;
- uint64_t _currentTailFileCreatedTime;
- //! Utils functions for parse state file
- std::string trimLeft(const std::string& s);
- std::string trimRight(const std::string& s);
- void parseStateFileLine(char *buf);
- void checkRollOver();
-
-};
-
-//! Matched File Item for Roll over check
-typedef struct {
- std::string fileName;
- uint64_t modifiedTime;
-} TailMatchedFileItem;
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index 5eb5d8a..4e39da3 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -20,51 +20,60 @@
#ifndef __THREADED_SCHEDULING_AGENT_H__
#define __THREADED_SCHEDULING_AGENT_H__
-#include "Configure.h"
-#include "Logger.h"
-#include "Processor.h"
-#include "ProcessContext.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "core/Repository.h"
+#include "core/ProcessContext.h"
#include "SchedulingAgent.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
/**
* An abstract scheduling agent which creates and manages a pool of threads for
* each processor scheduled.
*/
-class ThreadedSchedulingAgent : public SchedulingAgent
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- ThreadedSchedulingAgent()
- : SchedulingAgent()
- {
- }
- //! Destructor
- virtual ~ThreadedSchedulingAgent()
- {
- }
+class ThreadedSchedulingAgent : public SchedulingAgent {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ ThreadedSchedulingAgent(std::shared_ptr<core::Repository> repo)
+ : SchedulingAgent(repo) {
+ }
+ // Destructor
+ virtual ~ThreadedSchedulingAgent() {
+ }
- //! Run function for the thread
- virtual void run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory) = 0;
+ // Run function for the thread
+ virtual void run(std::shared_ptr<core::Processor> processor,
+ core::ProcessContext *processContext,
+ core::ProcessSessionFactory *sessionFactory) = 0;
-public:
- //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
- virtual void schedule(Processor *processor);
- //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
- virtual void unschedule(Processor *processor);
+ public:
+ // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+ virtual void schedule(std::shared_ptr<core::Processor> processor);
+ // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+ virtual void unschedule(std::shared_ptr<core::Processor> processor);
-protected:
- //! Threads
- std::map<std::string, std::vector<std::thread *>> _threads;
+ protected:
+ // Threads
+ std::map<std::string, std::vector<std::thread *>> _threads;
-private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
- ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
+ private:
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
+ ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
};
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 389ccf6..7da2abd 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -20,36 +20,47 @@
#ifndef __TIMER_DRIVEN_SCHEDULING_AGENT_H__
#define __TIMER_DRIVEN_SCHEDULING_AGENT_H__
-#include "Logger.h"
-#include "Processor.h"
-#include "ProcessContext.h"
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/Repository.h"
#include "ThreadedSchedulingAgent.h"
-//! TimerDrivenSchedulingAgent Class
-class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- TimerDrivenSchedulingAgent()
- : ThreadedSchedulingAgent()
- {
- }
- //! Destructor
- virtual ~TimerDrivenSchedulingAgent()
- {
- }
- //! Run function for the thread
- void run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory);
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+// TimerDrivenSchedulingAgent Class
+class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ TimerDrivenSchedulingAgent(std::shared_ptr<core::Repository> repo)
+ : ThreadedSchedulingAgent(repo) {
+ }
+ // Destructor
+ virtual ~TimerDrivenSchedulingAgent() {
+ }
+ /**
+ * Run function that accepts the processor, context and session factory.
+ */
+ void run(std::shared_ptr<core::Processor> processor,
+ core::ProcessContext *processContext,
+ core::ProcessSessionFactory *sessionFactory);
-private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
- TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
+ private:
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
+ TimerDrivenSchedulingAgent &operator=(
+ const TimerDrivenSchedulingAgent &parent);
};
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ConfigurableComponent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
new file mode 100644
index 0000000..c0cc623
--- /dev/null
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_
+#define LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_
+
+#include "Property.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Represents a configurable component
+ * Purpose: Extracts configuration items for all components and localized them
+ */
+class ConfigurableComponent {
+ public:
+
+
+ ConfigurableComponent() = delete;
+
+
+ explicit ConfigurableComponent(std::shared_ptr<logging::Logger> logger);
+
+ explicit ConfigurableComponent(const ConfigurableComponent &&other);
+
+ /**
+ * Get property using the provided name.
+ * @param name property name.
+ * @param value value passed in by reference
+ * @return result of getting property.
+ */
+ bool getProperty(const std::string name, std::string &value);
+ /**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return result of setting property.
+ */
+ bool setProperty(const std::string name, std::string value);
+ /**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return whether property was set or not
+ */
+ bool setProperty(Property &prop, std::string value);
+
+ /**
+ * Sets supported properties for the ConfigurableComponent
+ * @param supported properties
+ * @return result of set operation.
+ */
+ bool setSupportedProperties(std::set<Property> properties);
+ /**
+ * Sets supported properties for the ConfigurableComponent
+ * @param supported properties
+ * @return result of set operation.
+ */
+
+ virtual ~ConfigurableComponent();
+
+ protected:
+
+
+ /**
+ * Returns true if the instance can be edited.
+ * @return true/false
+ */
+ virtual bool canEdit()= 0;
+
+ std::mutex configuration_mutex_;
+ std::shared_ptr<logging::Logger> logger_;
+ // Supported properties
+ std::map<std::string, Property> properties_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ConfigurationFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h
new file mode 100644
index 0000000..19ed5f4
--- /dev/null
+++ b/libminifi/include/core/ConfigurationFactory.h
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_
+#define LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_
+
+#include "FlowConfiguration.h"
+#include <type_traits>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+
+
+
+template<typename T>
+typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(
+ std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) {
+ throw std::runtime_error("Cannot instantiate class");
+}
+
+template<typename T>
+typename std::enable_if<class_operations<T>::value, T*>::type instantiate(
+ std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) {
+ return new T(repo,flow_file_repo,path);
+}
+
+
+/**
+ * Configuration factory is used to create a new FlowConfiguration
+ * object.
+ */
+ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
+ std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_file_repo,
+ const std::string configuration_class_name, const std::string path = "",
+ bool fail_safe = false);
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONFIGURATIONFACTORY_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Connectable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
new file mode 100644
index 0000000..15e618f
--- /dev/null
+++ b/libminifi/include/core/Connectable.h
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_
+#define LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_
+
+#include <set>
+#include "core.h"
+#include <condition_variable>
+#include "core/logging/Logger.h"
+#include "Relationship.h"
+#include "Scheduling.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Represents the base connectable component
+ * Purpose: As in NiFi, this represents a connection point and allows the derived
+ * object to be connected to other connectables.
+ */
+class Connectable : public CoreComponent {
+ public:
+
+ explicit Connectable(std::string name, uuid_t uuid);
+
+ explicit Connectable(const Connectable &&other);
+
+ bool setSupportedRelationships(std::set<Relationship> relationships);
+
+ // Whether the relationship is supported
+ bool isSupportedRelationship(Relationship relationship);
+
+ /**
+ * Sets auto terminated relationships
+ * @param relationships
+ * @return result of set operation.
+ */
+ bool setAutoTerminatedRelationships(std::set<Relationship> relationships);
+
+ // Check whether the relationship is auto terminated
+ bool isAutoTerminated(Relationship relationship);
+
+ // Get Processor penalization period in MilliSecond
+ uint64_t getPenalizationPeriodMsec(void) {
+ return (_penalizationPeriodMsec);
+ }
+
+ /**
+ * Get outgoing connection based on relationship
+ * @return set of outgoing connections.
+ */
+ std::set<std::shared_ptr<Connectable>> getOutGoingConnections(
+ std::string relationship);
+
+ /**
+ * Get next incoming connection
+ * @return next incoming connection
+ */
+ std::shared_ptr<Connectable> getNextIncomingConnection();
+
+ /**
+ * @return true if incoming connections > 0
+ */
+ bool hasIncomingConnections() {
+ return (_incomingConnections.size() > 0);
+ }
+
+ uint8_t getMaxConcurrentTasks() {
+ return max_concurrent_tasks_;
+ }
+
+ void setMaxConcurrentTasks(const uint8_t tasks) {
+ max_concurrent_tasks_ = tasks;
+ }
+ /**
+ * Yield
+ */
+ virtual void yield() = 0;
+
+ virtual ~Connectable();
+
+ /**
+ * Determines if we are connected and operating
+ */
+ virtual bool isRunning() = 0;
+
+ /**
+ * Block until work is available on any input connection, or the given duration elapses
+ * @param timeoutMs timeout in milliseconds
+ */
+ void waitForWork(uint64_t timeoutMs);
+ /**
+ * Notify this processor that work may be available
+ */
+
+ void notifyWork();
+
+ /**
+ * Determines if work is available by this connectable
+ * @return boolean if work is available.
+ */
+ virtual bool isWorkAvailable() = 0;
+
+ protected:
+
+ // Penalization Period in MilliSecond
+ std::atomic<uint64_t> _penalizationPeriodMsec;
+
+ uint8_t max_concurrent_tasks_;
+
+ // Supported relationships
+ std::map<std::string, core::Relationship> relationships_;
+ // Autoterminated relationships
+ std::map<std::string, core::Relationship> auto_terminated_relationships_;
+
+ // Incoming connection Iterator
+ std::set<std::shared_ptr<Connectable>>::iterator incoming_connections_Iter;
+ // Incoming connections
+ std::set<std::shared_ptr<Connectable>> _incomingConnections;
+ // Outgoing connections map based on Relationship name
+ std::map<std::string, std::set<std::shared_ptr<Connectable>>>_outGoingConnections;
+
+ // Mutex for protection
+ std::mutex relationship_mutex_;
+
+ ///// work conditionals and locking mechanisms
+
+ // Concurrent condition mutex for whether there is incoming work to do
+ std::mutex work_available_mutex_;
+ // Condition for whether there is incoming work to do
+ std::atomic<bool> has_work_;
+ // Scheduling Strategy
+ std::atomic<SchedulingStrategy> strategy_;
+ // Concurrent condition variable for whether there is incoming work to do
+ std::condition_variable work_condition_;
+
+};
+
+}
+/* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
new file mode 100644
index 0000000..c7eedd2
--- /dev/null
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_
+#define LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_
+
+#include "core/core.h"
+#include "Connection.h"
+#include "RemoteProcessorGroupPort.h"
+#include "provenance/Provenance.h"
+#include "processors/GetFile.h"
+#include "processors/PutFile.h"
+#include "processors/TailFile.h"
+#include "processors/ListenSyslog.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/RealTimeDataCollector.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+#include "processors/ExecuteProcess.h"
+#include "processors/AppendHostInfo.h"
+#include "core/Processor.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessGroup.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Purpose: Flow configuration defines the mechanism
+ * by which we will configure our flow controller
+ */
+class FlowConfiguration : public CoreComponent {
+ public:
+ /**
+ * Constructor that will be used for configuring
+ * the flow controller.
+ */
+ FlowConfiguration(std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_file_repo,
+ const std::string path)
+ : CoreComponent(core::getClassName<FlowConfiguration>()),
+ flow_file_repo_(flow_file_repo),
+ config_path_(path) {
+
+ }
+
+ virtual ~FlowConfiguration();
+
+ // Create Processor (Node/Input/Output Port) based on the name
+ std::shared_ptr<core::Processor> createProcessor(std::string name,
+ uuid_t uuid);
+ // Create Root Processor Group
+ std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name,
+ uuid_t uuid);
+ // Create Remote Processor Group
+ std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name,
+ uuid_t uuid);
+ // Create Connection
+ std::shared_ptr<minifi::Connection> createConnection(std::string name,
+ uuid_t uuid);
+
+ /**
+ * Returns the configuration path string
+ * @return config_path_
+ */
+ const std::string &getConfigurationPath() {
+ return config_path_;
+ }
+
+ virtual std::unique_ptr<core::ProcessGroup> getRoot() {
+ return getRoot(config_path_);
+ }
+
+ /**
+ * Base implementation that returns a null root pointer.
+ * @return Extensions should return a non-null pointer in order to
+ * properly configure flow controller.
+ */
+ virtual std::unique_ptr<core::ProcessGroup> getRoot(
+ const std::string &from_config) {
+ return nullptr;
+ }
+
+ protected:
+ // configuration path
+ std::string config_path_;
+ // flow file repo
+ std::shared_ptr<core::Repository> flow_file_repo_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ */
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/FlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
new file mode 100644
index 0000000..247ad26
--- /dev/null
+++ b/libminifi/include/core/FlowFile.h
@@ -0,0 +1,283 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef RECORD_H
+#define RECORD_H
+
+#include "utils/TimeUtil.h"
+#include "ResourceClaim.h"
+#include "Connectable.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class FlowFile {
+ public:
+ FlowFile();
+ ~FlowFile();
+ FlowFile& operator=(const FlowFile& other);
+
+ /**
+ * Returns a pointer to this flow file record's
+ * claim
+ */
+ std::shared_ptr<ResourceClaim> getResourceClaim();
+ /**
+ * Sets _claim to the inbound claim argument
+ */
+ void setResourceClaim(std::shared_ptr<ResourceClaim> &claim);
+
+ /**
+ * clear the resource claim
+ */
+ void clearResourceClaim();
+
+ /**
+ * Get lineage identifiers
+ */
+ std::set<std::string> &getlineageIdentifiers();
+
+ /**
+ * Returns whether or not this flow file record
+ * is marked as deleted.
+ * @return marked deleted
+ */
+ bool isDeleted();
+
+ /**
+ * Sets whether to mark this flow file record
+ * as deleted
+ * @param deleted deleted flag
+ */
+ void setDeleted(const bool deleted);
+
+ /**
+ * Get entry date for this record
+ * @return entry date uint64_t
+ */
+ uint64_t getEntryDate();
+
+ /**
+ * Gets the event time.
+ * @return event time.
+ */
+ uint64_t getEventTime();
+ /**
+ * Get lineage start date
+ * @return lineage start date uint64_t
+ */
+ uint64_t getlineageStartDate();
+
+ /**
+ * Sets the lineage start date
+ * @param date new lineage start date
+ */
+ void setLineageStartDate(const uint64_t date);
+
+ void setLineageIdentifiers(std::set<std::string> lineage_Identifiers) {
+ lineage_Identifiers_ = lineage_Identifiers;
+ }
+ /**
+ * Obtains an attribute if it exists. If it does the value is
+ * copied into value
+ * @param key key to look for
+ * @param value value to set
+ * @return result of finding key
+ */
+ bool getAttribute(std::string key, std::string &value);
+
+ /**
+ * Updates the value in the attribute map that corresponds
+ * to key
+ * @param key attribute name
+ * @param value value to set to attribute name
+ * @return result of finding key
+ */
+ bool updateAttribute(const std::string key, const std::string value);
+
+ /**
+ * Removes the attribute
+ * @param key attribute name to remove
+ * @return result of finding key
+ */
+ bool removeAttribute(const std::string key);
+
+ /**
+ * setAttribute, if attribute already there, update it, else, add it
+ */
+ void setAttribute(const std::string &key, const std::string &value) {
+ attributes_[key] = value;
+ }
+
+ /**
+ * Returns the map of attributes
+ * @return attributes.
+ */
+ std::map<std::string, std::string> getAttributes() {
+ return attributes_;
+ }
+
+ /**
+ * adds an attribute if it does not exist
+ *
+ */
+ bool addAttribute(const std::string &key, const std::string &value);
+
+ /**
+ * Set the size of this record.
+ * @param size size of record to set.�
+ */
+ void setSize(const uint64_t size) {
+ size_ = size;
+ }
+ /**
+ * Returns the size of corresponding flow file
+ * @return size as a uint64_t
+ */
+ uint64_t getSize();
+
+ /**
+ * Sets the offset
+ * @param offset offset to apply to this record.
+ */
+ void setOffset(const uint64_t offset) {
+ offset_ = offset;
+ }
+
+ /**
+ * Sets the penalty expiration
+ * @param penaltyExp new penalty expiration
+ */
+ void setPenaltyExpiration(const uint64_t penaltyExp) {
+ penaltyExpiration_ms_ = penaltyExp;
+ }
+
+ uint64_t getPenaltyExpiration() {
+ return penaltyExpiration_ms_;
+ }
+
+ /**
+ * Gets the offset within the flow file
+ * @return size as a uint64_t
+ */
+ uint64_t getOffset();
+
+ // Get the UUID as string
+ std::string getUUIDStr() {
+ return uuid_str_;
+ }
+
+ bool getUUID(uuid_t other)
+ {
+ uuid_copy(other,uuid_);
+ return true;
+ }
+
+ // Check whether it is still being penalized
+ bool isPenalized() {
+ return (
+ penaltyExpiration_ms_ > 0 ?
+ penaltyExpiration_ms_ > getTimeMillis() : false);
+ }
+
+ /**
+ * Sets the original connection with a shared pointer.
+ * @param connection shared connection.
+ */
+ void setConnection(std::shared_ptr<core::Connectable> &connection);
+
+ /**
+ * Sets the original connection with a shared pointer.
+ * @param connection shared connection.
+ */
+ void setConnection(std::shared_ptr<core::Connectable> &&connection);
+
+ /**
+ * Returns the connection referenced by this record.
+ * @return shared connection pointer.
+ */
+ std::shared_ptr<core::Connectable> getConnection();
+ /**
+ * Sets the original connection with a shared pointer.
+ * @param connection shared connection.
+ */
+ void setOriginalConnection(std::shared_ptr<core::Connectable> &connection);
+ /**
+ * Returns the original connection referenced by this record.
+ * @return shared original connection pointer.
+ */
+ std::shared_ptr<core::Connectable> getOriginalConnection();
+
+ void setStoredToRepository(bool storedInRepository) {
+ stored = storedInRepository;
+ }
+
+ bool isStored() {
+ return stored;
+ }
+
+ protected:
+ bool stored;
+ // Mark for deletion
+ bool marked_delete_;
+ // Date at which the flow file entered the flow
+ uint64_t entry_date_;
+ // event time
+ uint64_t event_time_;
+ // Date at which the origin of this flow file entered the flow
+ uint64_t lineage_start_date_;
+ // Date at which the flow file was queued
+ uint64_t last_queue_date_;
+ // Size in bytes of the data corresponding to this flow file
+ uint64_t size_;
+ // A global unique identifier
+ uuid_t uuid_;
+ // A local unique identifier
+ uint64_t id_;
+ // Offset to the content
+ uint64_t offset_;
+ // Penalty expiration
+ uint64_t penaltyExpiration_ms_;
+ // Attributes key/values pairs for the flow record
+ std::map<std::string, std::string> attributes_;
+ // Pointer to the associated content resource claim
+ std::shared_ptr<ResourceClaim> claim_;
+ // UUID string
+ std::string uuid_str_;
+ // UUID string for all parents
+ std::set<std::string> lineage_Identifiers_;
+
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+
+ // Connection queue that this flow file will be transfer or current in
+ std::shared_ptr<core::Connectable> connection_;
+ // Orginal connection queue that this flow file was dequeued from
+ std::shared_ptr<core::Connectable> original_connection_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif // RECORD_H
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
new file mode 100644
index 0000000..1da85cd
--- /dev/null
+++ b/libminifi/include/core/ProcessContext.h
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_CONTEXT_H__
+#define __PROCESS_CONTEXT_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+
+#include "Property.h"
+#include "core/logging/Logger.h"
+#include "ProcessorNode.h"
+#include "core/Repository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ProcessContext Class
+class ProcessContext {
+ public:
+ // Constructor
+ /*!
+ * Create a new process context associated with the processor/controller service/state manager
+ */
+ ProcessContext(ProcessorNode &processor,
+ std::shared_ptr<core::Repository> repo)
+ : processor_node_(processor) {
+ logger_ = logging::Logger::getLogger();
+ repo_ = repo;
+ }
+ // Destructor
+ virtual ~ProcessContext() {
+ }
+ // Get Processor associated with the Process Context
+ ProcessorNode &getProcessorNode() {
+ return processor_node_;
+ }
+ bool getProperty(std::string name, std::string &value) {
+ return processor_node_.getProperty(name, value);
+ }
+ // Sets the property value using the property's string name
+ bool setProperty(std::string name, std::string value) {
+ return processor_node_.setProperty(name, value);
+ }
+ // Sets the property value using the Property object
+ bool setProperty(Property prop, std::string value) {
+ return processor_node_.setProperty(prop, value);
+ }
+ // Whether the relationship is supported
+ bool isSupportedRelationship(Relationship relationship) {
+ return processor_node_.isSupportedRelationship(relationship);
+ }
+
+ // Check whether the relationship is auto terminated
+ bool isAutoTerminated(Relationship relationship) {
+ return processor_node_.isAutoTerminated(relationship);
+ }
+ // Get ProcessContext Maximum Concurrent Tasks
+ uint8_t getMaxConcurrentTasks(void) {
+ return processor_node_.getMaxConcurrentTasks();
+ }
+ // Yield based on the yield period
+ void yield() {
+ processor_node_.yield();
+ }
+
+ std::shared_ptr<core::Repository> getProvenanceRepository() {
+ return repo_;
+ }
+
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ ProcessContext(const ProcessContext &parent) = delete;
+ ProcessContext &operator=(const ProcessContext &parent) = delete;
+
+ private:
+
+ // repository shared pointer.
+ std::shared_ptr<core::Repository> repo_;
+ // Processor
+ ProcessorNode processor_node_;
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
new file mode 100644
index 0000000..75bb0ba
--- /dev/null
+++ b/libminifi/include/core/ProcessGroup.h
@@ -0,0 +1,190 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_GROUP_H__
+#define __PROCESS_GROUP_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "Processor.h"
+#include "Exception.h"
+#include "TimerDrivenSchedulingAgent.h"
+#include "EventDrivenSchedulingAgent.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// Process Group Type
+enum ProcessGroupType {
+ ROOT_PROCESS_GROUP = 0,
+ REMOTE_PROCESS_GROUP,
+ MAX_PROCESS_GROUP_TYPE
+};
+
+// ProcessGroup Class
+class ProcessGroup {
+ public:
+ // Constructor
+ /*!
+ * Create a new process group
+ */
+ ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL,
+ ProcessGroup *parent = NULL);
+ // Destructor
+ virtual ~ProcessGroup();
+ // Set Processor Name
+ void setName(std::string name) {
+ name_ = name;
+ }
+ // Get Process Name
+ std::string getName(void) {
+ return (name_);
+ }
+ // Set URL
+ void setURL(std::string url) {
+ url_ = url;
+ }
+ // Get URL
+ std::string getURL(void) {
+ return (url_);
+ }
+ // SetTransmitting
+ void setTransmitting(bool val) {
+ transmitting_ = val;
+ }
+ // Get Transmitting
+ bool getTransmitting() {
+ return transmitting_;
+ }
+ // setTimeOut
+ void setTimeOut(uint64_t time) {
+ timeOut_ = time;
+ }
+ uint64_t getTimeOut() {
+ return timeOut_;
+ }
+ // Set Processor yield period in MilliSecond
+ void setYieldPeriodMsec(uint64_t period) {
+ yield_period_msec_ = period;
+ }
+ // Get Processor yield period in MilliSecond
+ uint64_t getYieldPeriodMsec(void) {
+ return (yield_period_msec_);
+ }
+ // Set UUID
+ void setUUID(uuid_t uuid) {
+ uuid_copy(uuid_, uuid);
+ }
+ // Get UUID
+ bool getUUID(uuid_t uuid) {
+ if (uuid) {
+ uuid_copy(uuid, uuid_);
+ return true;
+ } else
+ return false;
+ }
+ // Start Processing
+ void startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler);
+ // Stop Processing
+ void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
+ EventDrivenSchedulingAgent *eventScheduler);
+ // Whether it is root process group
+ bool isRootProcessGroup();
+ // set parent process group
+ void setParent(ProcessGroup *parent) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ parent_process_group_ = parent;
+ }
+ // get parent process group
+ ProcessGroup *getParent(void) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return parent_process_group_;
+ }
+ // Add processor
+ void addProcessor(std::shared_ptr<Processor> processor);
+ // Remove processor
+ void removeProcessor(std::shared_ptr<Processor> processor);
+ // Add child processor group
+ void addProcessGroup(ProcessGroup *child);
+ // Remove child processor group
+ void removeProcessGroup(ProcessGroup *child);
+ // ! Add connections
+ void addConnection(std::shared_ptr<Connection> connection);
+ // findProcessor based on UUID
+ std::shared_ptr<Processor> findProcessor(uuid_t uuid);
+ // findProcessor based on name
+ std::shared_ptr<Processor> findProcessor(const std::string &processorName);
+ // removeConnection
+ void removeConnection(std::shared_ptr<Connection> connection);
+ // update property value
+ void updatePropertyValue(std::string processorName, std::string propertyName,
+ std::string propertyValue);
+
+ void getConnections(
+ std::map<std::string, std::shared_ptr<Connection>> &connectionMap);
+
+ protected:
+ // A global unique identifier
+ uuid_t uuid_;
+ // Processor Group Name
+ std::string name_;
+ // Process Group Type
+ ProcessGroupType type_;
+ // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
+ std::set<std::shared_ptr<Processor> > processors_;
+ std::set<ProcessGroup *> child_process_groups_;
+ // Connections between the processor inside the group;
+ std::set<std::shared_ptr<Connection> > connections_;
+ // Parent Process Group
+ ProcessGroup* parent_process_group_;
+ // Yield Period in Milliseconds
+ std::atomic<uint64_t> yield_period_msec_;
+ std::atomic<uint64_t> timeOut_;
+ // URL
+ std::string url_;
+ // Transmitting
+ std::atomic<bool> transmitting_;
+
+ private:
+
+ // Mutex for protection
+ std::mutex mutex_;
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ ProcessGroup(const ProcessGroup &parent);
+ ProcessGroup &operator=(const ProcessGroup &parent);
+};
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
new file mode 100644
index 0000000..b516817
--- /dev/null
+++ b/libminifi/include/core/ProcessSession.h
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_SESSION_H__
+#define __PROCESS_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "ProcessContext.h"
+#include "FlowFileRecord.h"
+#include "Exception.h"
+#include "core/logging/Logger.h"
+#include "FlowFile.h"
+#include "provenance/Provenance.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ProcessSession Class
+class ProcessSession {
+ public:
+ // Constructor
+ /*!
+ * Create a new process session
+ */
+ ProcessSession(ProcessContext *processContext = NULL)
+ : process_context_(processContext) {
+ logger_ = logging::Logger::getLogger();
+ logger_->log_trace("ProcessSession created for %s",
+ process_context_->getProcessorNode().getName().c_str());
+ auto repo = processContext->getProvenanceRepository();
+ provenance_report_ = new provenance::ProvenanceReporter(
+ repo, process_context_->getProcessorNode().getUUIDStr(),
+ process_context_->getProcessorNode().getName());
+ }
+
+// Destructor
+ virtual ~ProcessSession() {
+ if (provenance_report_)
+ delete provenance_report_;
+ }
+// Commit the session
+ void commit();
+// Roll Back the session
+ void rollback();
+// Get Provenance Report
+ provenance::ProvenanceReporter *getProvenanceReporter() {
+ return provenance_report_;
+ }
+//
+// Get the FlowFile from the highest priority queue
+ std::shared_ptr<core::FlowFile> get();
+// Create a new UUID FlowFile with no content resource claim and without parent
+ std::shared_ptr<core::FlowFile> create();
+// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
+ std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent);
+// Clone a new UUID FlowFile from parent both for content resource claim and attributes
+ std::shared_ptr<core::FlowFile> clone(
+ std::shared_ptr<core::FlowFile> &parent);
+// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
+ std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent,
+ long offset, long size);
+// Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
+ std::shared_ptr<core::FlowFile> duplicate(
+ std::shared_ptr<core::FlowFile> &original);
+// Transfer the FlowFile to the relationship
+ void transfer(std::shared_ptr<core::FlowFile> &flow,
+ Relationship relationship);
+ void transfer(std::shared_ptr<core::FlowFile> &&flow,
+ Relationship relationship);
+// Put Attribute
+ void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key,
+ std::string value);
+ void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key,
+ std::string value);
+// Remove Attribute
+ void removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key);
+ void removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key);
+// Remove Flow File
+ void remove(std::shared_ptr<core::FlowFile> &flow);
+ void remove(std::shared_ptr<core::FlowFile> &&flow);
+// Execute the given read callback against the content
+ void read(std::shared_ptr<core::FlowFile> &flow,
+ InputStreamCallback *callback);
+ void read(std::shared_ptr<core::FlowFile> &&flow,
+ InputStreamCallback *callback);
+// Execute the given write callback against the content
+ void write(std::shared_ptr<core::FlowFile> &flow,
+ OutputStreamCallback *callback);
+ void write(std::shared_ptr<core::FlowFile> &&flow,
+ OutputStreamCallback *callback);
+// Execute the given write/append callback against the content
+ void append(std::shared_ptr<core::FlowFile> &flow,
+ OutputStreamCallback *callback);
+ void append(std::shared_ptr<core::FlowFile> &&flow,
+ OutputStreamCallback *callback);
+// Penalize the flow
+ void penalize(std::shared_ptr<core::FlowFile> &flow);
+ void penalize(std::shared_ptr<core::FlowFile> &&flow);
+// Import the existed file into the flow
+ void import(std::string source, std::shared_ptr<core::FlowFile> &flow,
+ bool keepSource = true, uint64_t offset = 0);
+ void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,
+ bool keepSource = true, uint64_t offset = 0);
+
+// Prevent default copy constructor and assignment operation
+// Only support pass by reference or pointer
+ ProcessSession(const ProcessSession &parent) = delete;
+ ProcessSession &operator=(const ProcessSession &parent) = delete;
+
+ protected:
+// FlowFiles being modified by current process session
+ std::map<std::string, std::shared_ptr<core::FlowFile> > _updatedFlowFiles;
+// Copy of the original FlowFiles being modified by current process session as above
+ std::map<std::string, std::shared_ptr<core::FlowFile> > _originalFlowFiles;
+// FlowFiles being added by current process session
+ std::map<std::string, std::shared_ptr<core::FlowFile> > _addedFlowFiles;
+// FlowFiles being deleted by current process session
+ std::map<std::string, std::shared_ptr<core::FlowFile> > _deletedFlowFiles;
+// FlowFiles being transfered to the relationship
+ std::map<std::string, Relationship> _transferRelationship;
+// FlowFiles being cloned for multiple connections per relationship
+ std::map<std::string, std::shared_ptr<core::FlowFile> > _clonedFlowFiles;
+
+ private:
+// Clone the flow file during transfer to multiple connections for a relationship
+ std::shared_ptr<core::FlowFile> cloneDuringTransfer(
+ std::shared_ptr<core::FlowFile> &parent);
+// ProcessContext
+ ProcessContext *process_context_;
+// Logger
+ std::shared_ptr<logging::Logger> logger_;
+// Provenance Report
+ provenance::ProvenanceReporter *provenance_report_;
+
+}
+;
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessSessionFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSessionFactory.h b/libminifi/include/core/ProcessSessionFactory.h
new file mode 100644
index 0000000..e0ebe18
--- /dev/null
+++ b/libminifi/include/core/ProcessSessionFactory.h
@@ -0,0 +1,64 @@
+/**
+ * @file ProcessSessionFactory.h
+ * ProcessSessionFactory class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_SESSION_FACTORY_H__
+#define __PROCESS_SESSION_FACTORY_H__
+
+#include <memory>
+
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// ProcessSessionFactory Class
+class ProcessSessionFactory {
+ public:
+ // Constructor
+ /*!
+ * Create a new process session factory
+ */
+ explicit ProcessSessionFactory(ProcessContext *processContext)
+ : process_context_(processContext) {
+ }
+
+ // Create the session
+ std::unique_ptr<ProcessSession> createSession();
+
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ ProcessSessionFactory(const ProcessSessionFactory &parent) = delete;
+ ProcessSessionFactory &operator=(const ProcessSessionFactory &parent) = delete;
+
+ private:
+ // ProcessContext
+ ProcessContext *process_context_;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
new file mode 100644
index 0000000..fd0411f
--- /dev/null
+++ b/libminifi/include/core/Processor.h
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESSOR_H__
+#define __PROCESSOR_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <memory>
+#include <condition_variable>
+#include <atomic>
+#include <algorithm>
+#include <set>
+#include <chrono>
+#include <functional>
+
+#include "Connectable.h"
+#include "ConfigurableComponent.h"
+#include "Property.h"
+#include "utils/TimeUtil.h"
+#include "Relationship.h"
+#include "Connection.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+#include "ProcessSessionFactory.h"
+#include "Scheduling.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// Minimum scheduling period in Nano Second
+#define MINIMUM_SCHEDULING_NANOS 30000
+
+// Default yield period in second
+#define DEFAULT_YIELD_PERIOD_SECONDS 1
+
+// Default penalization period in second
+#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
+
+// Processor Class
+class Processor : public Connectable, public ConfigurableComponent,
+ public std::enable_shared_from_this<Processor> {
+
+ public:
+ // Constructor
+ /*!
+ * Create a new processor
+ */
+ Processor(std::string name, uuid_t uuid = NULL);
+ // Destructor
+ virtual ~Processor() {
+ }
+
+ bool isRunning();
+ // Set Processor Scheduled State
+ void setScheduledState(ScheduledState state);
+ // Get Processor Scheduled State
+ ScheduledState getScheduledState(void) {
+ return state_;
+ }
+ // Set Processor Scheduling Strategy
+ void setSchedulingStrategy(SchedulingStrategy strategy) {
+ strategy_ = strategy;
+ }
+ // Get Processor Scheduling Strategy
+ SchedulingStrategy getSchedulingStrategy(void) {
+ return strategy_;
+ }
+ // Set Processor Loss Tolerant
+ void setlossTolerant(bool lossTolerant) {
+ loss_tolerant_ = lossTolerant;
+ }
+ // Get Processor Loss Tolerant
+ bool getlossTolerant(void) {
+ return loss_tolerant_;
+ }
+ // Set Processor Scheduling Period in Nano Second
+ void setSchedulingPeriodNano(uint64_t period) {
+ uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS;
+ scheduling_period_nano_ = std::max(period, minPeriod);
+ }
+ // Get Processor Scheduling Period in Nano Second
+ uint64_t getSchedulingPeriodNano(void) {
+ return scheduling_period_nano_;
+ }
+ // Set Processor Run Duration in Nano Second
+ void setRunDurationNano(uint64_t period) {
+ run_durantion_nano_ = period;
+ }
+ // Get Processor Run Duration in Nano Second
+ uint64_t getRunDurationNano(void) {
+ return (run_durantion_nano_);
+ }
+ // Set Processor yield period in MilliSecond
+ void setYieldPeriodMsec(uint64_t period) {
+ yield_period_msec_ = period;
+ }
+ // Get Processor yield period in MilliSecond
+ uint64_t getYieldPeriodMsec(void) {
+ return (yield_period_msec_);
+ }
+ // Set Processor penalization period in MilliSecond
+ void setPenalizationPeriodMsec(uint64_t period) {
+ _penalizationPeriodMsec = period;
+ }
+
+
+ // Set Processor Maximum Concurrent Tasks
+ void setMaxConcurrentTasks(uint8_t tasks) {
+ max_concurrent_tasks_ = tasks;
+ }
+ // Get Processor Maximum Concurrent Tasks
+ uint8_t getMaxConcurrentTasks(void) {
+ return (max_concurrent_tasks_);
+ }
+ // Set Trigger when empty
+ void setTriggerWhenEmpty(bool value) {
+ _triggerWhenEmpty = value;
+ }
+ // Get Trigger when empty
+ bool getTriggerWhenEmpty(void) {
+ return (_triggerWhenEmpty);
+ }
+ // Get Active Task Counts
+ uint8_t getActiveTasks(void) {
+ return (active_tasks_);
+ }
+ // Increment Active Task Counts
+ void incrementActiveTasks(void) {
+ active_tasks_++;
+ }
+ // decrement Active Task Counts
+ void decrementActiveTask(void) {
+ active_tasks_--;
+ }
+ void clearActiveTask(void) {
+ active_tasks_ = 0;
+ }
+ // Yield based on the yield period
+ void yield() {
+ yield_expiration_ = (getTimeMillis() + yield_period_msec_);
+ }
+ // Yield based on the input time
+ void yield(uint64_t time) {
+ yield_expiration_ = (getTimeMillis() + time);
+ }
+ // whether need be to yield
+ bool isYield() {
+ if (yield_expiration_ > 0)
+ return (yield_expiration_ >= getTimeMillis());
+ else
+ return false;
+ }
+ // clear yield expiration
+ void clearYield() {
+ yield_expiration_ = 0;
+ }
+ // get yield time
+ uint64_t getYieldTime() {
+ uint64_t curTime = getTimeMillis();
+ if (yield_expiration_ > curTime)
+ return (yield_expiration_ - curTime);
+ else
+ return 0;;
+ }
+ // Whether flow file queued in incoming connection
+ bool flowFilesQueued();
+ // Whether flow file queue full in any of the outgoin connection
+ bool flowFilesOutGoingFull();
+
+ // Get outgoing connections based on relationship name
+ std::set<std::shared_ptr<Connection> > getOutGoingConnections(
+ std::string relationship);
+ // Add connection
+ bool addConnection(std::shared_ptr<Connectable> connection);
+ // Remove connection
+ void removeConnection(std::shared_ptr<Connectable> connection);
+ // Get the UUID as string
+ std::string getUUIDStr() {
+ return uuidStr_;
+ }
+ // Get the Next RoundRobin incoming connection
+ std::shared_ptr<Connection> getNextIncomingConnection();
+ // On Trigger
+ void onTrigger(ProcessContext *context,
+ ProcessSessionFactory *sessionFactory);
+
+ virtual bool canEdit() {
+ return !isRunning();
+ }
+
+ public:
+
+
+ // OnTrigger method, implemented by NiFi Processor Designer
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0;
+ // Initialize, overridden by NiFi Process Designer
+ virtual void initialize() {
+ }
+ // Scheduled event hook, overridden by NiFi Process Designer
+ virtual void onSchedule(ProcessContext *context,
+ ProcessSessionFactory *sessionFactory) {
+ }
+
+ protected:
+
+ // Processor state
+ std::atomic<ScheduledState> state_;
+
+ // lossTolerant
+ std::atomic<bool> loss_tolerant_;
+ // SchedulePeriod in Nano Seconds
+ std::atomic<uint64_t> scheduling_period_nano_;
+ // Run Duration in Nano Seconds
+ std::atomic<uint64_t> run_durantion_nano_;
+ // Yield Period in Milliseconds
+ std::atomic<uint64_t> yield_period_msec_;
+
+ // Active Tasks
+ std::atomic<uint8_t> active_tasks_;
+ // Trigger the Processor even if the incoming connection is empty
+ std::atomic<bool> _triggerWhenEmpty;
+
+private:
+
+ // Mutex for protection
+ std::mutex mutex_;
+ // Yield Expiration
+ std::atomic<uint64_t> yield_expiration_;
+
+
+ // Check all incoming connections for work
+ bool isWorkAvailable();
+ // Logger
+ std::shared_ptr<logging::Logger> logger_;
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ Processor(const Processor &parent);
+ Processor &operator=(const Processor &parent);
+
+};
+
+}
+/* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessorConfig.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
new file mode 100644
index 0000000..6b4a00a
--- /dev/null
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
+#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
+
+#include "core/core.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+
+struct ProcessorConfig {
+ std::string id;
+ std::string name;
+ std::string javaClass;
+ std::string maxConcurrentTasks;
+ std::string schedulingStrategy;
+ std::string schedulingPeriod;
+ std::string penalizationPeriod;
+ std::string yieldPeriod;
+ std::string runDurationNanos;
+ std::vector<std::string> autoTerminatedRelationships;
+ std::vector<core::Property> properties;
+};
+
+
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif /* LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/ProcessorNode.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
new file mode 100644
index 0000000..8836f62
--- /dev/null
+++ b/libminifi/include/core/ProcessorNode.h
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_
+#define LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_
+
+#include "ConfigurableComponent.h"
+#include "Connectable.h"
+#include "Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Processor node functions as a pass through to the implementing Connectables
+ * ProcessorNode can be used by itself or with a pass through object, in which case
+ * we need to function as a passthrough or not.
+ */
+class ProcessorNode : public ConfigurableComponent, public Connectable {
+ public:
+ explicit ProcessorNode(const std::shared_ptr<Connectable> processor);
+
+ explicit ProcessorNode(const ProcessorNode &other);
+
+ /**
+ * Get property using the provided name.
+ * @param name property name.
+ * @param value value passed in by reference
+ * @return result of getting property.
+ */
+ std::shared_ptr<Connectable> getProcessor() const {
+ return processor_;
+ }
+
+ void yield() {
+ processor_->yield();
+ }
+
+ /**
+ * Get property using the provided name.
+ * @param name property name.
+ * @param value value passed in by reference
+ * @return result of getting property.
+ */
+ bool getProperty(const std::string name, std::string &value) {
+ const std::shared_ptr<ConfigurableComponent> processor_cast =
+ std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ if (nullptr != processor_cast)
+ return processor_cast->getProperty(name, value);
+ else {
+ return ConfigurableComponent::getProperty(name, value);
+ }
+ }
+ /**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return result of setting property.
+ */
+ bool setProperty(const std::string name, std::string value) {
+ const std::shared_ptr<ConfigurableComponent> processor_cast =
+ std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ bool ret = ConfigurableComponent::setProperty(name, value);
+ if (nullptr != processor_cast)
+ ret = processor_cast->setProperty(name, value);
+
+ return ret;
+
+ }
+
+ /**
+ * Sets the property using the provided name
+ * @param property name
+ * @param value property value.
+ * @return whether property was set or not
+ */
+ bool setProperty(Property &prop, std::string value) {
+ const std::shared_ptr<ConfigurableComponent> processor_cast =
+ std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ bool ret = ConfigurableComponent::setProperty(prop, value);
+ if (nullptr != processor_cast)
+ ret = processor_cast->setProperty(prop, value);
+
+ return ret;
+ }
+
+ /**
+ * Sets supported properties for the ConfigurableComponent
+ * @param supported properties
+ * @return result of set operation.
+ */
+ bool setSupportedProperties(std::set<Property> properties) {
+ const std::shared_ptr<ConfigurableComponent> processor_cast =
+ std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+ bool ret = ConfigurableComponent::setSupportedProperties(properties);
+ if (nullptr != processor_cast)
+ ret = processor_cast->setSupportedProperties(properties);
+
+ return ret;
+ }
+ /**
+ * Sets supported properties for the ConfigurableComponent
+ * @param supported properties
+ * @return result of set operation.
+ */
+
+ bool setAutoTerminatedRelationships(std::set<Relationship> relationships) {
+ return processor_->setAutoTerminatedRelationships(relationships);
+ }
+
+ bool isAutoTerminated(Relationship relationship) {
+ return processor_->isAutoTerminated(relationship);
+ }
+
+ bool setSupportedRelationships(std::set<Relationship> relationships) {
+ return processor_->setSupportedRelationships(relationships);
+ }
+
+ bool isSupportedRelationship(Relationship relationship) {
+ return processor_->isSupportedRelationship(relationship);
+ }
+
+ /**
+ * Set name.
+ * @param name
+ */
+ void setName(const std::string name) {
+ Connectable::setName(name);
+ processor_->setName(name);
+ }
+
+ /**
+ * Set UUID in this instance
+ * @param uuid uuid to apply to the internal representation.
+ */
+ void setUUID(uuid_t uuid) {
+ Connectable::setUUID(uuid);
+ processor_->setUUID(uuid);
+ }
+
+// Get Processor penalization period in MilliSecond
+ uint64_t getPenalizationPeriodMsec(void) {
+ return processor_->getPenalizationPeriodMsec();
+ }
+
+ /**
+ * Get outgoing connection based on relationship
+ * @return set of outgoing connections.
+ */
+ std::set<std::shared_ptr<Connectable>> getOutGoingConnections(
+ std::string relationship) {
+ return processor_->getOutGoingConnections(relationship);
+ }
+
+ /**
+ * Get next incoming connection
+ * @return next incoming connection
+ */
+ std::shared_ptr<Connectable> getNextIncomingConnection() {
+ return processor_->getNextIncomingConnection();
+ }
+
+ /**
+ * @return true if incoming connections > 0
+ */
+ bool hasIncomingConnections() {
+ return processor_->hasIncomingConnections();
+ }
+
+ /**
+ * Returns the UUID through the provided object.
+ * @param uuid uuid struct to which we will copy the memory
+ * @return success of request
+ */
+ bool getUUID(uuid_t uuid) {
+ return processor_->getUUID(uuid);
+ }
+
+ unsigned const char *getUUID() {
+ return processor_->getUUID();
+ }
+ /**
+ * Return the UUID string
+ * @param constant reference to the UUID str
+ */
+ const std::string & getUUIDStr() const {
+ return processor_->getUUIDStr();
+ }
+
+// Get Process Name
+ std::string getName() const {
+ return processor_->getName();
+ }
+
+ uint8_t getMaxConcurrentTasks() {
+ return processor_->getMaxConcurrentTasks();
+ }
+
+ void setMaxConcurrentTasks(const uint8_t tasks) {
+ processor_->setMaxConcurrentTasks(tasks);
+ }
+
+ virtual bool isRunning();
+
+ virtual bool isWorkAvailable();
+
+ virtual ~ProcessorNode();
+
+ protected:
+
+ virtual bool canEdit() {
+ return !processor_->isRunning();
+ }
+
+ /**
+ * internal connectable.
+ */
+ std::shared_ptr<Connectable> processor_;
+
+}
+;
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_PROCESSOR_PROCESSORNODE_H_ */