You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/10/13 15:07:35 UTC
[17/18] nifi-minifi-cpp git commit: MINIFI-34 Establishing CMake
build system to provide build functionality equivalent to pre-existing
Makefile.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/inc/ProcessGroup.h b/inc/ProcessGroup.h
deleted file mode 100644
index 4dd26f8..0000000
--- a/inc/ProcessGroup.h
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * @file ProcessGroup.h
- * ProcessGroup class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PROCESS_GROUP_H__
-#define __PROCESS_GROUP_H__
-
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include <algorithm>
-#include <set>
-
-#include "Logger.h"
-#include "Processor.h"
-#include "Exception.h"
-#include "TimerDrivenSchedulingAgent.h"
-
-//! Process Group Type
-enum ProcessGroupType
-{
- ROOT_PROCESS_GROUP = 0,
- REMOTE_PROCESS_GROUP,
- MAX_PROCESS_GROUP_TYPE
-};
-
-//! ProcessGroup Class
-class ProcessGroup
-{
-public:
- //! Constructor
- /*!
- * Create a new process group
- */
- ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, ProcessGroup *parent = NULL);
- //! Destructor
- virtual ~ProcessGroup();
- //! Set Processor Name
- void setName(std::string name) {
- _name = name;
- }
- //! Get Process Name
- std::string getName(void) {
- return (_name);
- }
- //! Set URL
- void setURL(std::string url) {
- _url = url;
- }
- //! Get URL
- std::string getURL(void) {
- return (_url);
- }
- //! SetTransmitting
- void setTransmitting(bool val)
- {
- _transmitting = val;
- }
- //! Get Transmitting
- bool getTransmitting()
- {
- return _transmitting;
- }
- //! setTimeOut
- void setTimeOut(uint64_t time)
- {
- _timeOut = time;
- }
- uint64_t getTimeOut()
- {
- return _timeOut;
- }
- //! Set Processor yield period in MilliSecond
- void setYieldPeriodMsec(uint64_t period) {
- _yieldPeriodMsec = period;
- }
- //! Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec(void) {
- return(_yieldPeriodMsec);
- }
- //! Set UUID
- void setUUID(uuid_t uuid) {
- uuid_copy(_uuid, uuid);
- }
- //! Get UUID
- bool getUUID(uuid_t uuid) {
- if (uuid)
- {
- uuid_copy(uuid, _uuid);
- return true;
- }
- else
- return false;
- }
- //! Start Processing
- void startProcessing(TimerDrivenSchedulingAgent *timeScheduler);
- //! Stop Processing
- void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler);
- //! Whether it is root process group
- bool isRootProcessGroup();
- //! set parent process group
- void setParent(ProcessGroup *parent) {
- std::lock_guard<std::mutex> lock(_mtx);
- _parentProcessGroup = parent;
- }
- //! get parent process group
- ProcessGroup *getParent(void) {
- std::lock_guard<std::mutex> lock(_mtx);
- return _parentProcessGroup;
- }
- //! Add processor
- void addProcessor(Processor *processor);
- //! Remove processor
- void removeProcessor(Processor *processor);
- //! Add child processor group
- void addProcessGroup(ProcessGroup *child);
- //! Remove child processor group
- void removeProcessGroup(ProcessGroup *child);
- // ! Add connections
- void addConnection(Connection *connection);
- //! findProcessor based on UUID
- Processor *findProcessor(uuid_t uuid);
- //! findProcessor based on name
- Processor *findProcessor(std::string processorName);
- //! removeConnection
- void removeConnection(Connection *connection);
- //! update property value
- void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue);
-
-protected:
- //! A global unique identifier
- uuid_t _uuid;
- //! Processor Group Name
- std::string _name;
- //! Process Group Type
- ProcessGroupType _type;
- //! Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
- std::set<Processor *> _processors;
- std::set<ProcessGroup *> _childProcessGroups;
- //! Connections between the processor inside the group;
- std::set<Connection *> _connections;
- //! Parent Process Group
- ProcessGroup* _parentProcessGroup;
- //! Yield Period in Milliseconds
- std::atomic<uint64_t> _yieldPeriodMsec;
- std::atomic<uint64_t> _timeOut;
- //! URL
- std::string _url;
- //! Transmitting
- std::atomic<bool> _transmitting;
-
-private:
-
- //! Mutex for protection
- std::mutex _mtx;
- //! Logger
- Logger *_logger;
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- ProcessGroup(const ProcessGroup &parent);
- ProcessGroup &operator=(const ProcessGroup &parent);
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ProcessSession.h
----------------------------------------------------------------------
diff --git a/inc/ProcessSession.h b/inc/ProcessSession.h
deleted file mode 100644
index c8ec3a5..0000000
--- a/inc/ProcessSession.h
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * @file ProcessSession.h
- * ProcessSession class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PROCESS_SESSION_H__
-#define __PROCESS_SESSION_H__
-
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include <algorithm>
-#include <set>
-
-#include "Logger.h"
-#include "Processor.h"
-#include "ProcessContext.h"
-#include "FlowFileRecord.h"
-#include "Exception.h"
-
-//! ProcessSession Class
-class ProcessSession
-{
-public:
- //! Constructor
- /*!
- * Create a new process session
- */
- ProcessSession(ProcessContext *processContext = NULL) : _processContext(processContext) {
- _logger = Logger::getLogger();
- _logger->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str());
- }
- //! Destructor
- virtual ~ProcessSession() {}
- //! Commit the session
- void commit();
- //! Roll Back the session
- void rollback();
- //!
- //! Get the FlowFile from the highest priority queue
- FlowFileRecord *get();
- //! Create a new UUID FlowFile with no content resource claim and without parent
- FlowFileRecord *create();
- //! Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent
- FlowFileRecord *create(FlowFileRecord *parent);
- //! Clone a new UUID FlowFile from parent both for content resource claim and attributes
- FlowFileRecord *clone(FlowFileRecord *parent);
- //! Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
- FlowFileRecord *clone(FlowFileRecord *parent, long offset, long size);
- //! Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session
- FlowFileRecord *duplicate(FlowFileRecord *orignal);
- //! Transfer the FlowFile to the relationship
- void transfer(FlowFileRecord *flow, Relationship relationship);
- //! Put Attribute
- void putAttribute(FlowFileRecord *flow, std::string key, std::string value);
- //! Remove Attribute
- void removeAttribute(FlowFileRecord *flow, std::string key);
- //! Remove Flow File
- void remove(FlowFileRecord *flow);
- //! Execute the given read callback against the content
- void read(FlowFileRecord *flow, InputStreamCallback *callback);
- //! Execute the given write callback against the content
- void write(FlowFileRecord *flow, OutputStreamCallback *callback);
- //! Execute the given write/append callback against the content
- void append(FlowFileRecord *flow, OutputStreamCallback *callback);
- //! Penalize the flow
- void penalize(FlowFileRecord *flow);
- //! Import the existed file into the flow
- void import(std::string source, FlowFileRecord *flow, bool keepSource = true, uint64_t offset = 0);
-
-protected:
- //! FlowFiles being modified by current process session
- std::map<std::string, FlowFileRecord *> _updatedFlowFiles;
- //! Copy of the original FlowFiles being modified by current process session as above
- std::map<std::string, FlowFileRecord *> _originalFlowFiles;
- //! FlowFiles being added by current process session
- std::map<std::string, FlowFileRecord *> _addedFlowFiles;
- //! FlowFiles being deleted by current process session
- std::map<std::string, FlowFileRecord *> _deletedFlowFiles;
- //! FlowFiles being transfered to the relationship
- std::map<std::string, Relationship> _transferRelationship;
- //! FlowFiles being cloned for multiple connections per relationship
- std::map<std::string, FlowFileRecord *> _clonedFlowFiles;
-
-private:
- // Clone the flow file during transfer to multiple connections for a relationship
- FlowFileRecord* cloneDuringTransfer(FlowFileRecord *parent);
- //! ProcessContext
- ProcessContext *_processContext;
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- ProcessSession(const ProcessSession &parent);
- ProcessSession &operator=(const ProcessSession &parent);
- //! Logger
- Logger *_logger;
-
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Processor.h
----------------------------------------------------------------------
diff --git a/inc/Processor.h b/inc/Processor.h
deleted file mode 100644
index db26ad0..0000000
--- a/inc/Processor.h
+++ /dev/null
@@ -1,346 +0,0 @@
-/**
- * @file Processor.h
- * Processor class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PROCESSOR_H__
-#define __PROCESSOR_H__
-
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include <algorithm>
-#include <set>
-
-#include "TimeUtil.h"
-#include "Property.h"
-#include "Relationship.h"
-#include "Connection.h"
-
-//! Forwarder declaration
-class ProcessContext;
-class ProcessSession;
-
-//! Minimum scheduling period in Nano Second
-#define MINIMUM_SCHEDULING_NANOS 30000
-
-//! Default yield period in second
-#define DEFAULT_YIELD_PERIOD_SECONDS 1
-
-//! Default penalization period in second
-#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
-
-/*!
- * Indicates the valid values for the state of a entity
- * with respect to scheduling the entity to run.
- */
-enum ScheduledState {
-
- /**
- * Entity cannot be scheduled to run
- */
- DISABLED,
- /**
- * Entity can be scheduled to run but currently is not
- */
- STOPPED,
- /**
- * Entity is currently scheduled to run
- */
- RUNNING
-};
-
-/*!
- * Scheduling Strategy
- */
-enum SchedulingStrategy {
- //! Event driven
- EVENT_DRIVEN,
- //! Timer driven
- TIMER_DRIVEN,
- //! Cron Driven
- CRON_DRIVEN
-};
-
-//! Processor Class
-class Processor
-{
- friend class ProcessContext;
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- Processor(std::string name, uuid_t uuid = NULL);
- //! Destructor
- virtual ~Processor();
- //! Set Processor Name
- void setName(std::string name) {
- _name = name;
- }
- //! Get Process Name
- std::string getName(void) {
- return (_name);
- }
- //! Set UUID
- void setUUID(uuid_t uuid) {
- uuid_copy(_uuid, uuid);
- char uuidStr[37];
- uuid_unparse(_uuid, uuidStr);
- _uuidStr = uuidStr;
- }
- //! Get UUID
- bool getUUID(uuid_t uuid) {
- if (uuid)
- {
- uuid_copy(uuid, _uuid);
- return true;
- }
- else
- return false;
- }
- //! Set the supported processor properties while the process is not running
- bool setSupportedProperties(std::set<Property> properties);
- //! Set the supported relationships while the process is not running
- bool setSupportedRelationships(std::set<Relationship> relationships);
- //! Get the supported property value by name
- bool getProperty(std::string name, std::string &value);
- //! Set the supported property value by name wile the process is not running
- bool setProperty(std::string name, std::string value);
- //! Whether the relationship is supported
- bool isSupportedRelationship(Relationship relationship);
- //! Set the auto terminated relationships while the process is not running
- bool setAutoTerminatedRelationships(std::set<Relationship> relationships);
- //! Check whether the relationship is auto terminated
- bool isAutoTerminated(Relationship relationship);
- //! Check whether the processor is running
- bool isRunning();
- //! Set Processor Scheduled State
- void setScheduledState(ScheduledState state) {
- _state = state;
- }
- //! Get Processor Scheduled State
- ScheduledState getScheduledState(void) {
- return _state;
- }
- //! Set Processor Scheduling Strategy
- void setSchedulingStrategy(SchedulingStrategy strategy) {
- _strategy = strategy;
- }
- //! Get Processor Scheduling Strategy
- SchedulingStrategy getSchedulingStrategy(void) {
- return _strategy;
- }
- //! Set Processor Loss Tolerant
- void setlossTolerant(bool lossTolerant) {
- _lossTolerant = lossTolerant;
- }
- //! Get Processor Loss Tolerant
- bool getlossTolerant(void) {
- return _lossTolerant;
- }
- //! Set Processor Scheduling Period in Nano Second
- void setSchedulingPeriodNano(uint64_t period) {
- uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS;
- _schedulingPeriodNano = std::max(period, minPeriod);
- }
- //! Get Processor Scheduling Period in Nano Second
- uint64_t getSchedulingPeriodNano(void) {
- return _schedulingPeriodNano;
- }
- //! Set Processor Run Duration in Nano Second
- void setRunDurationNano(uint64_t period) {
- _runDurantionNano = period;
- }
- //! Get Processor Run Duration in Nano Second
- uint64_t getRunDurationNano(void) {
- return(_runDurantionNano);
- }
- //! Set Processor yield period in MilliSecond
- void setYieldPeriodMsec(uint64_t period) {
- _yieldPeriodMsec = period;
- }
- //! Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec(void) {
- return(_yieldPeriodMsec);
- }
- //! Set Processor penalization period in MilliSecond
- void setPenalizationPeriodMsec(uint64_t period) {
- _penalizationPeriodMsec = period;
- }
- //! Get Processor penalization period in MilliSecond
- uint64_t getPenalizationPeriodMsec(void) {
- return(_penalizationPeriodMsec);
- }
- //! Set Processor Maximum Concurrent Tasks
- void setMaxConcurrentTasks(uint8_t tasks) {
- _maxConcurrentTasks = tasks;
- }
- //! Get Processor Maximum Concurrent Tasks
- uint8_t getMaxConcurrentTasks(void) {
- return(_maxConcurrentTasks);
- }
- //! Set Trigger when empty
- void setTriggerWhenEmpty(bool value) {
- _triggerWhenEmpty = value;
- }
- //! Get Trigger when empty
- bool getTriggerWhenEmpty(void) {
- return(_triggerWhenEmpty);
- }
- //! Get Active Task Counts
- uint8_t getActiveTasks(void) {
- return(_activeTasks);
- }
- //! Increment Active Task Counts
- void incrementActiveTasks(void) {
- _activeTasks++;
- }
- //! decrement Active Task Counts
- void decrementActiveTask(void) {
- _activeTasks--;
- }
- void clearActiveTask(void) {
- _activeTasks = 0;
- }
- //! Yield based on the yield period
- void yield()
- {
- _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
- }
- //! Yield based on the input time
- void yield(uint64_t time)
- {
- _yieldExpiration = (getTimeMillis() + time);
- }
- //! whether need be to yield
- bool isYield()
- {
- if (_yieldExpiration > 0)
- return (_yieldExpiration >= getTimeMillis());
- else
- return false;
- }
- // clear yield expiration
- void clearYield()
- {
- _yieldExpiration = 0;
- }
- // get yield time
- uint64_t getYieldTime()
- {
- uint64_t curTime = getTimeMillis();
- if (_yieldExpiration > curTime)
- return (_yieldExpiration - curTime);
- else
- return 0;;
- }
- //! Whether flow file queued in incoming connection
- bool flowFilesQueued();
- //! Whether flow file queue full in any of the outgoin connection
- bool flowFilesOutGoingFull();
- //! Get incoming connections
- std::set<Connection *> getIncomingConnections() {
- return _incomingConnections;
- }
- //! Has Incoming Connection
- bool hasIncomingConnections() {
- return (_incomingConnections.size() > 0);
- }
- //! Get outgoing connections based on relationship name
- std::set<Connection *> getOutGoingConnections(std::string relationship);
- //! Add connection
- bool addConnection(Connection *connection);
- //! Remove connection
- void removeConnection(Connection *connection);
- //! Get the UUID as string
- std::string getUUIDStr() {
- return _uuidStr;
- }
- //! Get the Next RoundRobin incoming connection
- Connection *getNextIncomingConnection();
- //! On Trigger
- void onTrigger();
-
-public:
- //! OnTrigger method, implemented by NiFi Processor Designer
- virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0;
- //! Initialize, over write by NiFi Process Designer
- virtual void initialize(void) {
- return;
- }
-
-protected:
-
- //! A global unique identifier
- uuid_t _uuid;
- //! Processor Name
- std::string _name;
- //! Supported properties
- std::map<std::string, Property> _properties;
- //! Supported relationships
- std::map<std::string, Relationship> _relationships;
- //! Autoterminated relationships
- std::map<std::string, Relationship> _autoTerminatedRelationships;
- //! Processor state
- std::atomic<ScheduledState> _state;
- //! Scheduling Strategy
- std::atomic<SchedulingStrategy> _strategy;
- //! lossTolerant
- std::atomic<bool> _lossTolerant;
- //! SchedulePeriod in Nano Seconds
- std::atomic<uint64_t> _schedulingPeriodNano;
- //! Run Duration in Nano Seconds
- std::atomic<uint64_t> _runDurantionNano;
- //! Yield Period in Milliseconds
- std::atomic<uint64_t> _yieldPeriodMsec;
- //! Penalization Period in MilliSecond
- std::atomic<uint64_t> _penalizationPeriodMsec;
- //! Maximum Concurrent Tasks
- std::atomic<uint8_t> _maxConcurrentTasks;
- //! Active Tasks
- std::atomic<uint8_t> _activeTasks;
- //! Trigger the Processor even if the incoming connection is empty
- std::atomic<bool> _triggerWhenEmpty;
- //! Incoming connections
- std::set<Connection *> _incomingConnections;
- //! Outgoing connections map based on Relationship name
- std::map<std::string, std::set<Connection *>> _outGoingConnections;
- //! UUID string
- std::string _uuidStr;
-
-private:
-
- //! Mutex for protection
- std::mutex _mtx;
- //! Yield Expiration
- std::atomic<uint64_t> _yieldExpiration;
- //! Incoming connection Iterator
- std::set<Connection *>::iterator _incomingConnectionsIter;
- //! Logger
- Logger *_logger;
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- Processor(const Processor &parent);
- Processor &operator=(const Processor &parent);
-
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Property.h
----------------------------------------------------------------------
diff --git a/inc/Property.h b/inc/Property.h
deleted file mode 100644
index a724394..0000000
--- a/inc/Property.h
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * @file Property.h
- * Processor Property class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PROPERTY_H__
-#define __PROPERTY_H__
-
-#include <string>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include <set>
-#include <stdlib.h>
-#include <math.h>
-
-//! Time Unit
-enum TimeUnit {
- DAY,
- HOUR,
- MINUTE,
- SECOND,
- MILLISECOND,
- NANOSECOND
-};
-
-//! Property Class
-class Property {
-
-public:
- //! Constructor
- /*!
- * Create a new property
- */
- Property(const std::string name, const std::string description, const std::string value)
- : _name(name), _description(description), _value(value) {
- }
- Property() {}
- //! Destructor
- virtual ~Property() {}
- //! Get Name for the property
- std::string getName() {
- return _name;
- }
- //! Get Description for the property
- std::string getDescription() {
- return _description;
- }
- //! Get value for the property
- std::string getValue() {
- return _value;
- }
- //! Set value for the property
- void setValue(std::string value) {
- _value = value;
- }
- //! Compare
- bool operator < (const Property & right) const {
- return _name < right._name;
- }
-
- //! Convert TimeUnit to MilliSecond
- static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t &out)
- {
- if (unit == MILLISECOND)
- {
- out = input;
- return true;
- }
- else if (unit == SECOND)
- {
- out = input * 1000;
- return true;
- }
- else if (unit == MINUTE)
- {
- out = input * 60 * 1000;
- return true;
- }
- else if (unit == HOUR)
- {
- out = input * 60 * 60 * 1000;
- return true;
- }
- else if (unit == DAY)
- {
- out = 24 * 60 * 60 * 1000;
- return true;
- }
- else if (unit == NANOSECOND)
- {
- out = input/1000/1000;
- return true;
- }
- else
- {
- return false;
- }
- }
- //! Convert TimeUnit to NanoSecond
- static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t &out)
- {
- if (unit == MILLISECOND)
- {
- out = input * 1000 * 1000;
- return true;
- }
- else if (unit == SECOND)
- {
- out = input * 1000 * 1000 * 1000;
- return true;
- }
- else if (unit == MINUTE)
- {
- out = input * 60 * 1000 * 1000 * 1000;
- return true;
- }
- else if (unit == HOUR)
- {
- out = input * 60 * 60 * 1000 * 1000 * 1000;
- return true;
- }
- else if (unit == NANOSECOND)
- {
- out = input;
- return true;
- }
- else
- {
- return false;
- }
- }
- //! Convert String
- static bool StringToTime(std::string input, int64_t &output, TimeUnit &timeunit)
- {
- if (input.size() == 0) {
- return false;
- }
-
- const char *cvalue = input.c_str();
- char *pEnd;
- long int ival = strtol(cvalue, &pEnd, 0);
-
- if (pEnd[0] == '\0')
- {
- return false;
- }
-
- while (*pEnd == ' ')
- {
- // Skip the space
- pEnd++;
- }
-
- std::string unit(pEnd);
-
- if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" || unit == "secs")
- {
- timeunit = SECOND;
- output = ival;
- return true;
- }
- else if (unit == "min" || unit == "m" || unit == "mins" || unit == "minute" || unit == "minutes")
- {
- timeunit = MINUTE;
- output = ival;
- return true;
- }
- else if (unit == "ns" || unit == "nano" || unit == "nanos" || unit == "nanoseconds")
- {
- timeunit = NANOSECOND;
- output = ival;
- return true;
- }
- else if (unit == "ms" || unit == "milli" || unit == "millis" || unit == "milliseconds")
- {
- timeunit = MILLISECOND;
- output = ival;
- return true;
- }
- else if (unit == "h" || unit == "hr" || unit == "hour" || unit == "hrs" || unit == "hours")
- {
- timeunit = HOUR;
- output = ival;
- return true;
- }
- else if (unit == "d" || unit == "day" || unit == "days")
- {
- timeunit = DAY;
- output = ival;
- return true;
- }
- else
- return false;
- }
-
- //! Convert String to Integer
- static bool StringToInt(std::string input, int64_t &output)
- {
- if (input.size() == 0) {
- return false;
- }
-
- const char *cvalue = input.c_str();
- char *pEnd;
- long int ival = strtol(cvalue, &pEnd, 0);
-
- if (pEnd[0] == '\0')
- {
- output = ival;
- return true;
- }
-
- while (*pEnd == ' ')
- {
- // Skip the space
- pEnd++;
- }
-
- char end0 = toupper(pEnd[0]);
- if ( (end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T') || (end0 == 'P') )
- {
- if (pEnd[1] == '\0')
- {
- unsigned long int multiplier = 1000;
-
- if ( (end0 != 'K') ) {
- multiplier *= 1000;
- if (end0 != 'M') {
- multiplier *= 1000;
- if (end0 != 'G') {
- multiplier *= 1000;
- if (end0 != 'T') {
- multiplier *= 1000;
- }
- }
- }
- }
- output = ival * multiplier;
- return true;
-
- } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && (pEnd[2] == '\0')) {
-
- unsigned long int multiplier = 1024;
-
- if ( (end0 != 'K') ) {
- multiplier *= 1024;
- if (end0 != 'M') {
- multiplier *= 1024;
- if (end0 != 'G') {
- multiplier *= 1024;
- if (end0 != 'T') {
- multiplier *= 1024;
- }
- }
- }
- }
- output = ival * multiplier;
- return true;
- }
- }
-
- return false;
- }
- //! Convert String to Float
- static bool StringToFloat(std::string input, float &output)
- {
- const char *cvalue = input.c_str();
- char *pEnd;
- float val = strtof(cvalue, &pEnd);
-
- if (pEnd[0] == '\0')
- {
- output = val;
- return true;
- }
- else
- return false;
- }
- //! Convert String to Bool
- static bool StringToBool(std::string input, bool &output)
- {
- if (input == "true" || input == "True" || input == "TRUE")
- {
- output = true;
- return true;
- }
- if (input == "false" || input == "False" || input == "FALSE")
- {
- output = false;
- return true;
- }
- return false;
- }
-
- // Trim String utils
- static std::string trim(const std::string& s)
- {
- return trimRight(trimLeft(s));
- }
-
- static std::string trimLeft(const std::string& s)
- {
- const char *WHITESPACE = " \n\r\t";
- size_t startpos = s.find_first_not_of(WHITESPACE);
- return (startpos == std::string::npos) ? "" : s.substr(startpos);
- }
-
- static std::string trimRight(const std::string& s)
- {
- const char *WHITESPACE = " \n\r\t";
- size_t endpos = s.find_last_not_of(WHITESPACE);
- return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1);
- }
-
-protected:
- //! Name
- std::string _name;
- //! Description
- std::string _description;
- //! Value
- std::string _value;
-
-private:
-
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/RealTimeDataCollector.h
----------------------------------------------------------------------
diff --git a/inc/RealTimeDataCollector.h b/inc/RealTimeDataCollector.h
deleted file mode 100644
index 760b566..0000000
--- a/inc/RealTimeDataCollector.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * @file RealTimeDataCollector.h
- * RealTimeDataCollector class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REAL_TIME_DATA_COLLECTOR_H__
-#define __REAL_TIME_DATA_COLLECTOR_H__
-
-#include <stdio.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <string>
-#include <errno.h>
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-
-//! RealTimeDataCollector Class
-class RealTimeDataCollector : public Processor
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- RealTimeDataCollector(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid)
- {
- _realTimeSocket = 0;
- _batchSocket = 0;
- _logger = Logger::getLogger();
- _firstInvoking = false;
- _realTimeAccumulated = 0;
- _batchAcccumulated = 0;
- _queuedDataSize = 0;
- }
- //! Destructor
- virtual ~RealTimeDataCollector()
- {
- if (_realTimeSocket)
- close(_realTimeSocket);
- if (_batchSocket)
- close(_batchSocket);
- if (_fileStream.is_open())
- _fileStream.close();
- }
- //! Processor Name
- static const std::string ProcessorName;
- //! Supported Properties
- static Property REALTIMESERVERNAME;
- static Property REALTIMESERVERPORT;
- static Property BATCHSERVERNAME;
- static Property BATCHSERVERPORT;
- static Property FILENAME;
- static Property ITERATION;
- static Property REALTIMEMSGID;
- static Property BATCHMSGID;
- static Property REALTIMEINTERVAL;
- static Property BATCHINTERVAL;
- static Property BATCHMAXBUFFERSIZE;
- //! Supported Relationships
- static Relationship Success;
- //! Connect to the socket
- int connectServer(const char *host, uint16_t port);
- int sendData(int socket, const char *buf, int buflen);
- void onTriggerRealTime(ProcessContext *context, ProcessSession *session);
- void onTriggerBatch(ProcessContext *context, ProcessSession *session);
-
-public:
- //! OnTrigger method, implemented by NiFi RealTimeDataCollector
- virtual void onTrigger(ProcessContext *context, ProcessSession *session);
- //! Initialize, over write by NiFi RealTimeDataCollector
- virtual void initialize(void);
-
-protected:
-
-private:
- //! realtime server Name
- std::string _realTimeServerName;
- int64_t _realTimeServerPort;
- std::string _batchServerName;
- int64_t _batchServerPort;
- int64_t _realTimeInterval;
- int64_t _batchInterval;
- int64_t _batchMaxBufferSize;
- //! Match pattern for Real time Message ID
- std::vector<std::string> _realTimeMsgID;
- //! Match pattern for Batch Message ID
- std::vector<std::string> _batchMsgID;
- //! file for which the realTime collector will tail
- std::string _fileName;
- //! Whether we need to iterate from the beginning for demo
- bool _iteration;
- int _realTimeSocket;
- int _batchSocket;
- //! Logger
- Logger *_logger;
- //! Mutex for protection
- std::mutex _mtx;
- //! Queued data size
- uint64_t _queuedDataSize;
- //! Queue for the batch process
- std::queue<std::string> _queue;
- std::thread::id _realTimeThreadId;
- std::thread::id _batchThreadId;
- std::atomic<bool> _firstInvoking;
- int64_t _realTimeAccumulated;
- int64_t _batchAcccumulated;
- std::ifstream _fileStream;
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Relationship.h
----------------------------------------------------------------------
diff --git a/inc/Relationship.h b/inc/Relationship.h
deleted file mode 100644
index 3454ee5..0000000
--- a/inc/Relationship.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * @file Relationship.h
- * Relationship class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __RELATIONSHIP_H__
-#define __RELATIONSHIP_H__
-
-#include <string>
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-
-//! undefined relationship for remote process group outgoing port and root process group incoming port
-#define UNDEFINED_RELATIONSHIP "undefined"
-
-inline bool isRelationshipNameUndefined(std::string name)
-{
- if (name == UNDEFINED_RELATIONSHIP)
- return true;
- else
- return false;
-}
-
-//! Relationship Class
-class Relationship {
-
-public:
- //! Constructor
- /*!
- * Create a new relationship
- */
- Relationship(const std::string name, const std::string description)
- : _name(name), _description(description) {
- }
- Relationship()
- : _name(UNDEFINED_RELATIONSHIP) {
- }
- //! Destructor
- virtual ~Relationship() {
- }
- //! Get Name for the relationship
- std::string getName() {
- return _name;
- }
- //! Get Description for the relationship
- std::string getDescription() {
- return _description;
- }
- //! Compare
- bool operator < (const Relationship & right) const {
- return _name < right._name;
- }
- //! Whether it is a undefined relationship
- bool isRelationshipUndefined()
- {
- return isRelationshipNameUndefined(_name);
- }
-
-protected:
-
- //! Name
- std::string _name;
- //! Description
- std::string _description;
-
-private:
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/inc/RemoteProcessorGroupPort.h b/inc/RemoteProcessorGroupPort.h
deleted file mode 100644
index cd99e50..0000000
--- a/inc/RemoteProcessorGroupPort.h
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * @file RemoteProcessorGroupPort.h
- * RemoteProcessorGroupPort class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REMOTE_PROCESSOR_GROUP_PORT_H__
-#define __REMOTE_PROCESSOR_GROUP_PORT_H__
-
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-#include "Site2SiteClientProtocol.h"
-
-//! RemoteProcessorGroupPort Class
-class RemoteProcessorGroupPort : public Processor
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid)
- {
- _logger = Logger::getLogger();
- _peer = new Site2SitePeer("", 9999);
- _protocol = new Site2SiteClientProtocol(_peer);
- _protocol->setPortId(uuid);
- }
- //! Destructor
- virtual ~RemoteProcessorGroupPort()
- {
- delete _protocol;
- delete _peer;
- }
- //! Processor Name
- static const std::string ProcessorName;
- //! Supported Properties
- static Property hostName;
- static Property port;
- //! Supported Relationships
- static Relationship relation;
-public:
- //! OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
- virtual void onTrigger(ProcessContext *context, ProcessSession *session);
- //! Initialize, over write by NiFi RemoteProcessorGroupPort
- virtual void initialize(void);
- //! Set Direction
- void setDirection(TransferDirection direction)
- {
- _direction = direction;
- if (_direction == RECEIVE)
- this->setTriggerWhenEmpty(true);
- }
- //! Set Timeout
- void setTimeOut(uint64_t timeout)
- {
- _protocol->setTimeOut(timeout);
- }
- //! SetTransmitting
- void setTransmitting(bool val)
- {
- _transmitting = val;
- }
-
-protected:
-
-private:
- //! Logger
- Logger *_logger;
- //! Peer Connection
- Site2SitePeer *_peer;
- //! Peer Protocol
- Site2SiteClientProtocol *_protocol;
- //! Transaction Direction
- TransferDirection _direction;
- //! Transmitting
- bool _transmitting;
-
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/inc/ResourceClaim.h b/inc/ResourceClaim.h
deleted file mode 100644
index d8f9979..0000000
--- a/inc/ResourceClaim.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * @file ResourceClaim.h
- * Resource Claim class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __RESOURCE_CLAIM_H__
-#define __RESOURCE_CLAIM_H__
-
-#include <string>
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include "Configure.h"
-
-//! Default content directory
-#define DEFAULT_CONTENT_DIRECTORY "."
-
-//! ResourceClaim Class
-class ResourceClaim {
-
-public:
- //! Constructor
- /*!
- * Create a new resource claim
- */
- ResourceClaim(const std::string contentDirectory);
- //! Destructor
- virtual ~ResourceClaim() {}
- //! increaseFlowFileRecordOwnedCount
- void increaseFlowFileRecordOwnedCount()
- {
- ++_flowFileRecordOwnedCount;
- }
- //! decreaseFlowFileRecordOwenedCount
- void decreaseFlowFileRecordOwnedCount()
- {
- --_flowFileRecordOwnedCount;
- }
- //! getFlowFileRecordOwenedCount
- uint64_t getFlowFileRecordOwnedCount()
- {
- return _flowFileRecordOwnedCount;
- }
- //! Get the content full path
- std::string getContentFullPath()
- {
- return _contentFullPath;
- }
-
-protected:
- //! A global unique identifier
- uuid_t _uuid;
- //! A local unique identifier
- uint64_t _id;
- //! Full path to the content
- std::string _contentFullPath;
-
- //! How many FlowFileRecord Own this cliam
- std::atomic<uint64_t> _flowFileRecordOwnedCount;
-
-private:
- //! Configure
- Configure *_configure;
- //! Logger
- Logger *_logger;
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- ResourceClaim(const ResourceClaim &parent);
- ResourceClaim &operator=(const ResourceClaim &parent);
-
- //! Local resource claim number
- static std::atomic<uint64_t> _localResourceClaimNumber;
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/inc/SchedulingAgent.h b/inc/SchedulingAgent.h
deleted file mode 100644
index 2e3f6b8..0000000
--- a/inc/SchedulingAgent.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * @file SchedulingAgent.h
- * SchedulingAgent class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __SCHEDULING_AGENT_H__
-#define __SCHEDULING_AGENT_H__
-
-#include <uuid/uuid.h>
-#include <vector>
-#include <map>
-#include <mutex>
-#include <atomic>
-#include <algorithm>
-#include <thread>
-#include "TimeUtil.h"
-#include "Logger.h"
-#include "Configure.h"
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessContext.h"
-
-//! SchedulingAgent Class
-class SchedulingAgent
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- SchedulingAgent() {
- _configure = Configure::getConfigure();
- _logger = Logger::getLogger();
- _running = false;
- }
- //! Destructor
- virtual ~SchedulingAgent()
- {
-
- }
- //! onTrigger, return whether the yield is need
- bool onTrigger(Processor *processor);
- //! Whether agent has work to do
- bool hasWorkToDo(Processor *processor);
- //! Whether the outgoing need to be backpressure
- bool hasTooMuchOutGoing(Processor *processor);
- //! start
- void start() {
- _running = true;
- }
- //! stop
- void stop() {
- _running = false;
- }
-
-public:
- //! schedule, overwritten by different DrivenSchedulingAgent
- virtual void schedule(Processor *processor) = 0;
- //! unschedule, overwritten by different DrivenSchedulingAgent
- virtual void unschedule(Processor *processor) = 0;
-
-protected:
- //! Logger
- Logger *_logger;
- //! Configure
- Configure *_configure;
- //! Mutex for protection
- std::mutex _mtx;
- //! Whether it is running
- std::atomic<bool> _running;
- //! AdministrativeYieldDuration
- int64_t _administrativeYieldDuration;
- //! BoredYieldDuration
- int64_t _boredYieldDuration;
-
-private:
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- SchedulingAgent(const SchedulingAgent &parent);
- SchedulingAgent &operator=(const SchedulingAgent &parent);
-
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/inc/Site2SiteClientProtocol.h b/inc/Site2SiteClientProtocol.h
deleted file mode 100644
index 5b72b11..0000000
--- a/inc/Site2SiteClientProtocol.h
+++ /dev/null
@@ -1,638 +0,0 @@
-/**
- * @file Site2SiteClientProtocol.h
- * Site2SiteClientProtocol class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __SITE2SITE_CLIENT_PROTOCOL_H__
-#define __SITE2SITE_CLIENT_PROTOCOL_H__
-
-#include <stdio.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <string>
-#include <errno.h>
-#include <chrono>
-#include <thread>
-#include <algorithm>
-#include <uuid/uuid.h>
-#include "Logger.h"
-#include "Configure.h"
-#include "Property.h"
-#include "Site2SitePeer.h"
-#include "FlowFileRecord.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-//! Resource Negotiated Status Code
-#define RESOURCE_OK 20
-#define DIFFERENT_RESOURCE_VERSION 21
-#define NEGOTIATED_ABORT 255
-// ! Max attributes
-#define MAX_NUM_ATTRIBUTES 25000
-
-/**
- * An enumeration for specifying the direction in which data should be
- * transferred between a client and a remote NiFi instance.
- */
-typedef enum {
- /**
- * * The client is to send data to the remote instance.
- * */
- SEND,
- /**
- * * The client is to receive data from the remote instance.
- * */
- RECEIVE
-} TransferDirection;
-
-
-//! Peer State
-typedef enum {
- /**
- * * IDLE
- * */
- IDLE = 0,
- /**
- * * Socket Established
- * */
- ESTABLISHED,
- /**
- * * HandShake Done
- * */
- HANDSHAKED,
- /**
- * * After CodeDec Completion
- * */
- READY
-} PeerState;
-
-//! Transaction State
-typedef enum {
- /**
- * * Transaction has been started but no data has been sent or received.
- * */
- TRANSACTION_STARTED,
- /**
- * * Transaction has been started and data has been sent or received.
- * */
- DATA_EXCHANGED,
- /**
- * * Data that has been transferred has been confirmed via its CRC.
- * * Transaction is ready to be completed.
- * */
- TRANSACTION_CONFIRMED,
- /**
- * * Transaction has been successfully completed.
- * */
- TRANSACTION_COMPLETED,
- /**
- * * The Transaction has been canceled.
- * */
- TRANSACTION_CANCELED,
- /**
- * * The Transaction ended in an error.
- * */
- TRANSACTION_ERROR
-} TransactionState;
-
-//! Request Type
-typedef enum {
- NEGOTIATE_FLOWFILE_CODEC = 0,
- REQUEST_PEER_LIST,
- SEND_FLOWFILES,
- RECEIVE_FLOWFILES,
- SHUTDOWN,
- MAX_REQUEST_TYPE
-} RequestType;
-
-//! Request Type Str
-static const char *RequestTypeStr[MAX_REQUEST_TYPE] =
-{
- "NEGOTIATE_FLOWFILE_CODEC",
- "REQUEST_PEER_LIST",
- "SEND_FLOWFILES",
- "RECEIVE_FLOWFILES",
- "SHUTDOWN"
-};
-
-//! Respond Code
-typedef enum {
- RESERVED = 0,
- // ResponseCode, so that we can indicate a 0 followed by some other bytes
-
- // handshaking properties
- PROPERTIES_OK = 1,
- UNKNOWN_PROPERTY_NAME = 230,
- ILLEGAL_PROPERTY_VALUE = 231,
- MISSING_PROPERTY = 232,
- // transaction indicators
- CONTINUE_TRANSACTION = 10,
- FINISH_TRANSACTION = 11,
- CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum
- TRANSACTION_FINISHED = 13,
- TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14,
- CANCEL_TRANSACTION = 15,
- BAD_CHECKSUM = 19,
- // data availability indicators
- MORE_DATA = 20,
- NO_MORE_DATA = 21,
- // port state indicators
- UNKNOWN_PORT = 200,
- PORT_NOT_IN_VALID_STATE = 201,
- PORTS_DESTINATION_FULL = 202,
- // authorization
- UNAUTHORIZED = 240,
- // error indicators
- ABORT = 250,
- UNRECOGNIZED_RESPONSE_CODE = 254,
- END_OF_STREAM = 255
-} RespondCode;
-
-//! Respond Code Class
-typedef struct {
- RespondCode code;
- const char *description;
- bool hasDescription;
-} RespondCodeContext;
-
-//! Respond Code Context
-static RespondCodeContext respondCodeContext[] =
-{
- {RESERVED, "Reserved for Future Use", false},
- {PROPERTIES_OK, "Properties OK", false},
- {UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true},
- {ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true},
- {MISSING_PROPERTY, "Missing Property", true},
- {CONTINUE_TRANSACTION, "Continue Transaction", false},
- {FINISH_TRANSACTION, "Finish Transaction", false},
- {CONFIRM_TRANSACTION, "Confirm Transaction", true},
- {TRANSACTION_FINISHED, "Transaction Finished", false},
- {TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false},
- {CANCEL_TRANSACTION, "Cancel Transaction", true},
- {BAD_CHECKSUM, "Bad Checksum", false},
- {MORE_DATA, "More Data Exists", false},
- {NO_MORE_DATA, "No More Data Exists", false},
- {UNKNOWN_PORT, "Unknown Port", false},
- {PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true},
- {PORTS_DESTINATION_FULL, "Port's Destination is Full", false},
- {UNAUTHORIZED, "User Not Authorized", true},
- {ABORT, "Abort", true},
- {UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false},
- {END_OF_STREAM, "End of Stream", false}
-};
-
-//! Respond Code Sequence Pattern
-static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
-static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
-
-/**
- * Enumeration of Properties that can be used for the Site-to-Site Socket
- * Protocol.
- */
-typedef enum {
- /**
- * Boolean value indicating whether or not the contents of a FlowFile should
- * be GZipped when transferred.
- */
- GZIP,
- /**
- * The unique identifier of the port to communicate with
- */
- PORT_IDENTIFIER,
- /**
- * Indicates the number of milliseconds after the request was made that the
- * client will wait for a response. If no response has been received by the
- * time this value expires, the server can move on without attempting to
- * service the request because the client will have already disconnected.
- */
- REQUEST_EXPIRATION_MILLIS,
- /**
- * The preferred number of FlowFiles that the server should send to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol.
- */
- BATCH_COUNT,
- /**
- * The preferred number of bytes that the server should send to the client
- * when pulling data. This property was introduced in version 5 of the
- * protocol.
- */
- BATCH_SIZE,
- /**
- * The preferred amount of time that the server should send data to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol. Value is in milliseconds.
- */
- BATCH_DURATION,
- MAX_HANDSHAKE_PROPERTY
-} HandshakeProperty;
-
-//! HandShakeProperty Str
-static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] =
-{
- /**
- * Boolean value indicating whether or not the contents of a FlowFile should
- * be GZipped when transferred.
- */
- "GZIP",
- /**
- * The unique identifier of the port to communicate with
- */
- "PORT_IDENTIFIER",
- /**
- * Indicates the number of milliseconds after the request was made that the
- * client will wait for a response. If no response has been received by the
- * time this value expires, the server can move on without attempting to
- * service the request because the client will have already disconnected.
- */
- "REQUEST_EXPIRATION_MILLIS",
- /**
- * The preferred number of FlowFiles that the server should send to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol.
- */
- "BATCH_COUNT",
- /**
- * The preferred number of bytes that the server should send to the client
- * when pulling data. This property was introduced in version 5 of the
- * protocol.
- */
- "BATCH_SIZE",
- /**
- * The preferred amount of time that the server should send data to the
- * client when pulling data. This property was introduced in version 5 of
- * the protocol. Value is in milliseconds.
- */
- "BATCH_DURATION"
-};
-
-class Site2SiteClientProtocol;
-
-//! Transaction Class
-class Transaction
-{
- friend class Site2SiteClientProtocol;
-public:
- //! Constructor
- /*!
- * Create a new transaction
- */
- Transaction(TransferDirection direction) {
- _state = TRANSACTION_STARTED;
- _direction = direction;
- _dataAvailable = false;
- _transfers = 0;
- _bytes = 0;
-
- char uuidStr[37];
-
- // Generate the global UUID for the transaction
- uuid_generate(_uuid);
- uuid_unparse(_uuid, uuidStr);
- _uuidStr = uuidStr;
- }
- //! Destructor
- virtual ~Transaction()
- {
- }
- //! getUUIDStr
- std::string getUUIDStr()
- {
- return _uuidStr;
- }
- //! getState
- TransactionState getState()
- {
- return _state;
- }
- //! isDataAvailable
- bool isDataAvailable()
- {
- return _dataAvailable;
- }
- //! setDataAvailable()
- void setDataAvailable(bool value)
- {
- _dataAvailable = value;
- }
- //! getDirection
- TransferDirection getDirection()
- {
- return _direction;
- }
- //! getCRC
- long getCRC()
- {
- return _crc.getCRC();
- }
- //! updateCRC
- void updateCRC(uint8_t *buffer, uint32_t length)
- {
- _crc.update(buffer, length);
- }
-
-protected:
-
-private:
- //! Transaction State
- TransactionState _state;
- //! Transaction Direction
- TransferDirection _direction;
- //! Whether received data is available
- bool _dataAvailable;
- //! A global unique identifier
- uuid_t _uuid;
- //! UUID string
- std::string _uuidStr;
- //! Number of transfer
- int _transfers;
- //! Number of content bytes
- uint64_t _bytes;
- //! CRC32
- CRC32 _crc;
-
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- Transaction(const Transaction &parent);
- Transaction &operator=(const Transaction &parent);
-};
-
-/**
- * Represents a piece of data that is to be sent to or that was received from a
- * NiFi instance.
- */
-class DataPacket
-{
-public:
- DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
- std::map<std::string, std::string> attributes) {
- _protocol = protocol;
- _size = 0;
- _transaction = transaction;
- _attributes = attributes;
- }
- std::map<std::string, std::string> _attributes;
- uint64_t _size;
- Site2SiteClientProtocol *_protocol;
- Transaction *_transaction;
-};
-
-//! Site2SiteClientProtocol Class
-class Site2SiteClientProtocol
-{
-public:
- //! Constructor
- /*!
- * Create a new control protocol
- */
- Site2SiteClientProtocol(Site2SitePeer *peer) {
- _logger = Logger::getLogger();
- _configure = Configure::getConfigure();
- _peer = peer;
- _batchSize = 0;
- _batchCount = 0;
- _batchDuration = 0;
- _batchSendNanos = 5000000000; // 5 seconds
- _timeOut = 30000; // 30 seconds
- _peerState = IDLE;
- _supportedVersion[0] = 5;
- _supportedVersion[1] = 4;
- _supportedVersion[2] = 3;
- _supportedVersion[3] = 2;
- _supportedVersion[4] = 1;
- _currentVersion = _supportedVersion[0];
- _currentVersionIndex = 0;
- _supportedCodecVersion[0] = 1;
- _currentCodecVersion = _supportedCodecVersion[0];
- _currentCodecVersionIndex = 0;
- }
- //! Destructor
- virtual ~Site2SiteClientProtocol()
- {
- }
-
-public:
- //! setBatchSize
- void setBatchSize(uint64_t size)
- {
- _batchSize = size;
- }
- //! setBatchCount
- void setBatchCount(uint64_t count)
- {
- _batchCount = count;
- }
- //! setBatchDuration
- void setBatchDuration(uint64_t duration)
- {
- _batchDuration = duration;
- }
- //! setTimeOut
- void setTimeOut(uint64_t time)
- {
- _timeOut = time;
- if (_peer)
- _peer->setTimeOut(time);
-
- }
- //! getTimeout
- uint64_t getTimeOut()
- {
- return _timeOut;
- }
- //! setPortId
- void setPortId(uuid_t id)
- {
- uuid_copy(_portId, id);
- char idStr[37];
- uuid_unparse(id, idStr);
- _portIdStr = idStr;
- }
- //! getResourceName
- std::string getResourceName()
- {
- return "SocketFlowFileProtocol";
- }
- //! getCodecResourceName
- std::string getCodecResourceName()
- {
- return "StandardFlowFileCodec";
- }
- //! bootstrap the protocol to the ready for transaction state by going through the state machine
- bool bootstrap();
- //! establish
- bool establish();
- //! handShake
- bool handShake();
- //! negotiateCodec
- bool negotiateCodec();
- //! initiateResourceNegotiation
- bool initiateResourceNegotiation();
- //! initiateCodecResourceNegotiation
- bool initiateCodecResourceNegotiation();
- //! tearDown
- void tearDown();
- //! write Request Type
- int writeRequestType(RequestType type);
- //! read Request Type
- int readRequestType(RequestType &type);
- //! read Respond
- int readRespond(RespondCode &code, std::string &message);
- //! write respond
- int writeRespond(RespondCode code, std::string message);
- //! getRespondCodeContext
- RespondCodeContext *getRespondCodeContext(RespondCode code)
- {
- for (unsigned int i = 0; i < sizeof(respondCodeContext)/sizeof(RespondCodeContext); i++)
- {
- if (respondCodeContext[i].code == code)
- {
- return &respondCodeContext[i];
- }
- }
- return NULL;
- }
- //! getPeer
- Site2SitePeer *getPeer()
- {
- return _peer;
- }
- //! Creation of a new transaction, return the transaction ID if success,
- //! Return NULL when any error occurs
- Transaction *createTransaction(std::string &transactionID, TransferDirection direction);
- //! Receive the data packet from the transaction
- //! Return false when any error occurs
- bool receive(std::string transactionID, DataPacket *packet, bool &eof);
- //! Send the data packet from the transaction
- //! Return false when any error occurs
- bool send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session);
- //! Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received.
- bool confirm(std::string transactionID);
- //! Cancel the transaction
- void cancel(std::string transactionID);
- //! Complete the transaction
- bool complete(std::string transactionID);
- //! Error the transaction
- void error(std::string transactionID);
- //! Receive flow files for the process session
- void receiveFlowFiles(ProcessContext *context, ProcessSession *session);
- //! Transfer flow files for the process session
- void transferFlowFiles(ProcessContext *context, ProcessSession *session);
- //! deleteTransaction
- void deleteTransaction(std::string transactionID);
- //! Nest Callback Class for write stream
- class WriteCallback : public OutputStreamCallback
- {
- public:
- WriteCallback(DataPacket *packet)
- : _packet(packet) {}
- DataPacket *_packet;
- void process(std::ofstream *stream) {
- uint8_t buffer[8192];
- int len = _packet->_size;
- while (len > 0)
- {
- int size = std::min(len, (int) sizeof(buffer));
- int ret = _packet->_protocol->_peer->readData(buffer, size, &_packet->_transaction->_crc);
- if (ret != size)
- {
- _packet->_protocol->_logger->log_error("Site2Site Receive Flow Size %d Failed %d", size, ret);
- break;
- }
- stream->write((const char *) buffer, size);
- len -= size;
- }
- }
- };
- //! Nest Callback Class for read stream
- class ReadCallback : public InputStreamCallback
- {
- public:
- ReadCallback(DataPacket *packet)
- : _packet(packet) {}
- DataPacket *_packet;
- void process(std::ifstream *stream) {
- _packet->_size = 0;
- uint8_t buffer[8192];
- int readSize;
- while (stream->good())
- {
- if (!stream->read((char *)buffer, 8192))
- readSize = stream->gcount();
- else
- readSize = 8192;
- int ret = _packet->_protocol->_peer->write(buffer, readSize, &_packet->_transaction->_crc);
- if (ret != readSize)
- {
- _packet->_protocol->_logger->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret);
- break;
- }
- _packet->_size += readSize;
- }
- }
- };
-
-protected:
-
-private:
- //! Mutex for protection
- std::mutex _mtx;
- //! Logger
- Logger *_logger;
- //! Configure
- Configure *_configure;
- //! Batch Count
- std::atomic<uint64_t> _batchCount;
- //! Batch Size
- std::atomic<uint64_t> _batchSize;
- //! Batch Duration in msec
- std::atomic<uint64_t> _batchDuration;
- //! Timeout in msec
- std::atomic<uint64_t> _timeOut;
- //! Peer Connection
- Site2SitePeer *_peer;
- //! portId
- uuid_t _portId;
- //! portIDStr
- std::string _portIdStr;
- //! BATCH_SEND_NANOS
- uint64_t _batchSendNanos;
- //! Peer State
- PeerState _peerState;
- uint32_t _supportedVersion[5];
- uint32_t _currentVersion;
- int _currentVersionIndex;
- uint32_t _supportedCodecVersion[1];
- uint32_t _currentCodecVersion;
- int _currentCodecVersionIndex;
- //! commsIdentifier
- std::string _commsIdentifier;
- //! transaction map
- std::map<std::string, Transaction *> _transactionMap;
-
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- Site2SiteClientProtocol(const Site2SiteClientProtocol &parent);
- Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol &parent);
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/inc/Site2SitePeer.h b/inc/Site2SitePeer.h
deleted file mode 100644
index ff11637..0000000
--- a/inc/Site2SitePeer.h
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * @file Site2SitePeer.h
- * Site2SitePeer class declaration for site to site peer
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __SITE2SITE_PEER_H__
-#define __SITE2SITE_PEER_H__
-
-#include <stdio.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <string>
-#include <errno.h>
-#include <mutex>
-#include <atomic>
-#include "TimeUtil.h"
-#include "Logger.h"
-#include "Configure.h"
-#include "Property.h"
-
-class CRC32
-{
-public:
- CRC32() {
- crc = 0;
-
- if (tableInit)
- return;
-
- tableInit = true;
- unsigned int poly = 0xedb88320;
- unsigned int temp = 0;
- for(unsigned int i = 0; i < 256; ++i) {
- temp = i;
- for(int j = 8; j > 0; --j) {
- if((temp & 1) == 1) {
- temp = (unsigned int)((temp >> 1) ^ poly);
- }else {
- temp >>= 1;
- }
- }
- table[i] = temp;
- }
- }
-
- unsigned int update(uint8_t * bytes, size_t size) {
- crc = crc ^ ~0U;
- for(unsigned int i = 0; i < size; ++i) {
- uint8_t index = (uint8_t)(((crc) & 0xff) ^ bytes[i]);
- crc = (unsigned int)((crc >> 8) ^ table[index]);
- }
- crc = crc ^ ~0U;
- return crc;
- }
-
- long getCRC()
- {
- return crc;
- }
-
-private:
- static unsigned int table[256];
- static std::atomic<bool> tableInit;
- unsigned int crc;
-};
-
-static const char MAGIC_BYTES[] = {'N', 'i', 'F', 'i'};
-
-//! Site2SitePeer Class
-class Site2SitePeer
-{
-public:
- //! Constructor
- /*!
- * Create a new site2site peer
- */
- Site2SitePeer(std::string host, uint16_t port) {
- _logger = Logger::getLogger();
- _configure = Configure::getConfigure();
- _socket = 0;
- _host = host;
- _port = port;
- _yieldExpiration = 0;
- _timeOut = 30000; // 30 seconds
- _url = "nifi://" + _host + ":" + std::to_string(_port);
- }
- //! Destructor
- virtual ~Site2SitePeer() { Close();}
- //! Set Processor yield period in MilliSecond
- void setYieldPeriodMsec(uint64_t period) {
- _yieldPeriodMsec = period;
- }
- //! get URL
- std::string getURL() {
- return _url;
- }
- //! Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec(void) {
- return(_yieldPeriodMsec);
- }
- //! Yield based on the yield period
- void yield()
- {
- _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
- }
- //! setHostName
- void setHostName(std::string host)
- {
- _host = host;
- _url = "nifi://" + _host + ":" + std::to_string(_port);
- }
- //! setPort
- void setPort(uint16_t port)
- {
- _port = port;
- _url = "nifi://" + _host + ":" + std::to_string(_port);
- }
- //! getHostName
- std::string getHostName()
- {
- return _host;
- }
- //! getPort
- uint16_t getPort()
- {
- return _port;
- }
- //! Yield based on the input time
- void yield(uint64_t time)
- {
- _yieldExpiration = (getTimeMillis() + time);
- }
- //! whether need be to yield
- bool isYield()
- {
- if (_yieldExpiration > 0)
- return (_yieldExpiration >= getTimeMillis());
- else
- return false;
- }
- // clear yield expiration
- void clearYield()
- {
- _yieldExpiration = 0;
- }
- //! Yield based on the yield period
- void yield(std::string portId)
- {
- std::lock_guard<std::mutex> lock(_mtx);
- uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
- _yieldExpirationPortIdMap[portId] = yieldExpiration;
- }
- //! Yield based on the input time
- void yield(std::string portId, uint64_t time)
- {
- std::lock_guard<std::mutex> lock(_mtx);
- uint64_t yieldExpiration = (getTimeMillis() + time);
- _yieldExpirationPortIdMap[portId] = yieldExpiration;
- }
- //! whether need be to yield
- bool isYield(std::string portId)
- {
- std::lock_guard<std::mutex> lock(_mtx);
- std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId);
- if (it != _yieldExpirationPortIdMap.end())
- {
- uint64_t yieldExpiration = it->second;
- return (yieldExpiration >= getTimeMillis());
- }
- else
- {
- return false;
- }
- }
- //! clear yield expiration
- void clearYield(std::string portId)
- {
- std::lock_guard<std::mutex> lock(_mtx);
- std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId);
- if (it != _yieldExpirationPortIdMap.end())
- {
- _yieldExpirationPortIdMap.erase(portId);
- }
- }
- //! setTimeOut
- void setTimeOut(uint64_t time)
- {
- _timeOut = time;
- }
- //! getTimeOut
- uint64_t getTimeOut()
- {
- return _timeOut;
- }
- int write(uint8_t value, CRC32 *crc = NULL)
- {
- return sendData(&value, 1, crc);
- }
- int write(char value, CRC32 *crc = NULL)
- {
- return sendData((uint8_t *)&value, 1, crc);
- }
- int write(uint32_t value, CRC32 *crc = NULL)
- {
- uint8_t temp[4];
-
- temp[0] = (value & 0xFF000000) >> 24;
- temp[1] = (value & 0x00FF0000) >> 16;
- temp[2] = (value & 0x0000FF00) >> 8;
- temp[3] = (value & 0x000000FF);
- return sendData(temp, 4, crc);
- }
- int write(uint16_t value, CRC32 *crc = NULL)
- {
- uint8_t temp[2];
- temp[0] = (value & 0xFF00) >> 8;
- temp[1] = (value & 0xFF);
- return sendData(temp, 2, crc);
- }
- int write(uint8_t *value, int len, CRC32 *crc = NULL)
- {
- return sendData(value, len, crc);
- }
- int write(uint64_t value, CRC32 *crc = NULL)
- {
- uint8_t temp[8];
-
- temp[0] = (value >> 56) & 0xFF;
- temp[1] = (value >> 48) & 0xFF;
- temp[2] = (value >> 40) & 0xFF;
- temp[3] = (value >> 32) & 0xFF;
- temp[4] = (value >> 24) & 0xFF;
- temp[5] = (value >> 16) & 0xFF;
- temp[6] = (value >> 8) & 0xFF;
- temp[7] = (value >> 0) & 0xFF;
- return sendData(temp, 8, crc);
- }
- int write(bool value, CRC32 *crc = NULL)
- {
- uint8_t temp = value;
- return write(temp, crc);
- }
- int writeUTF(std::string str, bool widen = false, CRC32 *crc = NULL);
- int read(uint8_t &value, CRC32 *crc = NULL)
- {
- uint8_t buf;
-
- int ret = readData(&buf, 1, crc);
- if (ret == 1)
- value = buf;
- return ret;
- }
- int read(uint16_t &value, CRC32 *crc = NULL)
- {
- uint8_t buf[2];
-
- int ret = readData(buf, 2, crc);
- if (ret == 2)
- value = (buf[0] << 8) | buf[1];
- return ret;
- }
- int read(char &value, CRC32 *crc = NULL)
- {
- uint8_t buf;
-
- int ret = readData(&buf, 1, crc);
- if (ret == 1)
- value = (char) buf;
- return ret;
- }
- int read(uint8_t *value, int len, CRC32 *crc = NULL)
- {
- return readData(value, len, crc);
- }
- int read(uint32_t &value, CRC32 *crc = NULL)
- {
- uint8_t buf[4];
-
- int ret = readData(buf, 4, crc);
- if (ret == 4)
- value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
- return ret;
- }
- int read(uint64_t &value, CRC32 *crc = NULL)
- {
- uint8_t buf[8];
-
- int ret = readData(buf, 8, crc);
- if (ret == 8)
- {
- value = ((uint64_t) buf[0] << 56) |
- ((uint64_t) (buf[1] & 255) << 48) |
- ((uint64_t) (buf[2] & 255) << 40) |
- ((uint64_t) (buf[3] & 255) << 32) |
- ((uint64_t) (buf[4] & 255) << 24) |
- ((uint64_t) (buf[5] & 255) << 16) |
- ((uint64_t) (buf[6] & 255) << 8) |
- ((uint64_t) (buf[7] & 255) << 0);
- }
- return ret;
- }
- int readUTF(std::string &str, bool widen = false, CRC32 *crc = NULL);
- //! open connection to the peer
- bool Open();
- //! close connection to the peer
- void Close();
- //! Send Data via the socket, return -1 for failure
- int sendData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
- //! Read length into buf, return -1 for failure and 0 for EOF
- int readData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
- //! Select on the socket
- int Select(int msec);
-
-protected:
-
-private:
- //! Mutex for protection
- std::mutex _mtx;
- //! S2S server Name
- std::string _host;
- //! S2S server port
- uint16_t _port;
- //! socket to server
- int _socket;
- //! URL
- std::string _url;
- //! socket timeout;
- std::atomic<uint64_t> _timeOut;
- //! Logger
- Logger *_logger;
- //! Configure
- Configure *_configure;
- //! Yield Period in Milliseconds
- std::atomic<uint64_t> _yieldPeriodMsec;
- //! Yield Expiration
- std::atomic<uint64_t> _yieldExpiration;
- //! Yield Expiration per destination PortID
- std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- Site2SitePeer(const Site2SitePeer &parent);
- Site2SitePeer &operator=(const Site2SitePeer &parent);
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/TailFile.h
----------------------------------------------------------------------
diff --git a/inc/TailFile.h b/inc/TailFile.h
deleted file mode 100644
index 5c4ba09..0000000
--- a/inc/TailFile.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * @file TailFile.h
- * TailFile class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TAIL_FILE_H__
-#define __TAIL_FILE_H__
-
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-
-//! TailFile Class
-class TailFile : public Processor
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- TailFile(std::string name, uuid_t uuid = NULL)
- : Processor(name, uuid)
- {
- _logger = Logger::getLogger();
- _stateRecovered = false;
- }
- //! Destructor
- virtual ~TailFile()
- {
- storeState();
- }
- //! Processor Name
- static const std::string ProcessorName;
- //! Supported Properties
- static Property FileName;
- static Property StateFile;
- //! Supported Relationships
- static Relationship Success;
-
-public:
- //! OnTrigger method, implemented by NiFi TailFile
- virtual void onTrigger(ProcessContext *context, ProcessSession *session);
- //! Initialize, over write by NiFi TailFile
- virtual void initialize(void);
- //! recoverState
- void recoverState();
- //! storeState
- void storeState();
-
-protected:
-
-private:
- //! Logger
- Logger *_logger;
- std::string _fileLocation;
- //! Property Specified Tailed File Name
- std::string _fileName;
- //! File to save state
- std::string _stateFile;
- //! State related to the tailed file
- std::string _currentTailFileName;
- uint64_t _currentTailFilePosition;
- bool _stateRecovered;
- uint64_t _currentTailFileCreatedTime;
- //! Utils functions for parse state file
- std::string trimLeft(const std::string& s);
- std::string trimRight(const std::string& s);
- void parseStateFileLine(char *buf);
- void checkRollOver();
-
-};
-
-//! Matched File Item for Roll over check
-typedef struct {
- std::string fileName;
- uint64_t modifiedTime;
-} TailMatchedFileItem;
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/TimeUtil.h
----------------------------------------------------------------------
diff --git a/inc/TimeUtil.h b/inc/TimeUtil.h
deleted file mode 100644
index b024245..0000000
--- a/inc/TimeUtil.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * @file TimeUtil.h
- * Basic Time Utility
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TIME_UTIL_H__
-#define __TIME_UTIL_H__
-
-#include <time.h>
-#include <sys/time.h>
-#include <string.h>
-#include <unistd.h>
-#include <string.h>
-#include <iostream>
-
-#ifdef __MACH__
-#include <mach/clock.h>
-#include <mach/mach.h>
-#endif
-
-inline uint64_t getTimeMillis()
-{
- uint64_t value;
-
- timeval time;
- gettimeofday(&time, NULL);
- value = ((uint64_t) (time.tv_sec) * 1000) + (time.tv_usec / 1000);
-
- return value;
-}
-
-inline uint64_t getTimeNano()
-{
- struct timespec ts;
-
-#ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time
- clock_serv_t cclock;
- mach_timespec_t mts;
- host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
- clock_get_time(cclock, &mts);
- mach_port_deallocate(mach_task_self(), cclock);
- ts.tv_sec = mts.tv_sec;
- ts.tv_nsec = mts.tv_nsec;
-#else
- clock_gettime(CLOCK_REALTIME, &ts);
-#endif
-
- return ((uint64_t) (ts.tv_sec) * 1000000000 + ts.tv_nsec);
-}
-
-//! Convert millisecond since UTC to a time display string
-inline std::string getTimeStr(uint64_t msec)
-{
- char date[120];
- time_t second = (time_t) (msec/1000);
- msec = msec % 1000;
- strftime(date, sizeof(date) / sizeof(*date), "%Y-%m-%d %H:%M:%S",
- localtime(&second));
-
- std::string ret = date;
- date[0] = '\0';
- sprintf(date, ".%03llu", (unsigned long long) msec);
-
- ret += date;
- return ret;
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/inc/TimerDrivenSchedulingAgent.h b/inc/TimerDrivenSchedulingAgent.h
deleted file mode 100644
index 9195745..0000000
--- a/inc/TimerDrivenSchedulingAgent.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * @file TimerDrivenSchedulingAgent.h
- * TimerDrivenSchedulingAgent class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TIMER_DRIVEN_SCHEDULING_AGENT_H__
-#define __TIMER_DRIVEN_SCHEDULING_AGENT_H__
-
-#include "Logger.h"
-#include "Configure.h"
-#include "Processor.h"
-#include "ProcessContext.h"
-#include "SchedulingAgent.h"
-
-//! TimerDrivenSchedulingAgent Class
-class TimerDrivenSchedulingAgent : public SchedulingAgent
-{
-public:
- //! Constructor
- /*!
- * Create a new processor
- */
- TimerDrivenSchedulingAgent()
- : SchedulingAgent()
- {
- }
- //! Destructor
- virtual ~TimerDrivenSchedulingAgent()
- {
- }
- //! Run function for the thread
- static void run(TimerDrivenSchedulingAgent *agent, Processor *processor);
-
-public:
- //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
- virtual void schedule(Processor *processor);
- //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent
- virtual void unschedule(Processor *processor);
-
-protected:
-
-private:
- //! Threads
- std::map<std::string, std::vector<std::thread *>> _threads;
- // Prevent default copy constructor and assignment operation
- // Only support pass by reference or pointer
- TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
- TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent);
-
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/spdlog/async_logger.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/async_logger.h b/inc/spdlog/async_logger.h
deleted file mode 100644
index 517ce92..0000000
--- a/inc/spdlog/async_logger.h
+++ /dev/null
@@ -1,90 +0,0 @@
-/*************************************************************************/
-/* spdlog - an extremely fast and easy to use c++11 logging library. */
-/* Copyright (c) 2014 Gabi Melman. */
-/* */
-/* Permission is hereby granted, free of charge, to any person obtaining */
-/* a copy of this software and associated documentation files (the */
-/* "Software"), to deal in the Software without restriction, including */
-/* without limitation the rights to use, copy, modify, merge, publish, */
-/* distribute, sublicense, and/or sell copies of the Software, and to */
-/* permit persons to whom the Software is furnished to do so, subject to */
-/* the following conditions: */
-/* */
-/* The above copyright notice and this permission notice shall be */
-/* included in all copies or substantial portions of the Software. */
-/* */
-/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
-/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
-/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
-/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
-/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
-/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
-/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
-/*************************************************************************/
-
-#pragma once
-
-// Very fast asynchronous logger (millions of logs per second on an average desktop)
-// Uses pre allocated lockfree queue for maximum throughput even under large number of threads.
-// Creates a single back thread to pop messages from the queue and log them.
-//
-// Upon each log write the logger:
-// 1. Checks if its log level is enough to log the message
-// 2. Push a new copy of the message to a queue (or block the caller until space is available in the queue)
-// 3. will throw spdlog_ex upon log exceptions
-// Upong destruction, logs all remaining messages in the queue before destructing..
-
-#include <chrono>
-#include <functional>
-#include "common.h"
-#include "logger.h"
-#include "spdlog.h"
-
-
-namespace spdlog
-{
-
-namespace details
-{
-class async_log_helper;
-}
-
-class async_logger :public logger
-{
-public:
- template<class It>
- async_logger(const std::string& name,
- const It& begin,
- const It& end,
- size_t queue_size,
- const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
- const std::function<void()>& worker_warmup_cb = nullptr,
- const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero());
-
- async_logger(const std::string& logger_name,
- sinks_init_list sinks,
- size_t queue_size,
- const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
- const std::function<void()>& worker_warmup_cb = nullptr,
- const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero());
-
- async_logger(const std::string& logger_name,
- sink_ptr single_sink,
- size_t queue_size,
- const async_overflow_policy overflow_policy = async_overflow_policy::block_retry,
- const std::function<void()>& worker_warmup_cb = nullptr,
- const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero());
-
-
-protected:
- void _log_msg(details::log_msg& msg) override;
- void _set_formatter(spdlog::formatter_ptr msg_formatter) override;
- void _set_pattern(const std::string& pattern) override;
-
-private:
- std::unique_ptr<details::async_log_helper> _async_log_helper;
-};
-}
-
-
-#include "./details/async_logger_impl.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/spdlog/common.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/common.h b/inc/spdlog/common.h
deleted file mode 100644
index cde5a9e..0000000
--- a/inc/spdlog/common.h
+++ /dev/null
@@ -1,116 +0,0 @@
-/*************************************************************************/
-/* spdlog - an extremely fast and easy to use c++11 logging library. */
-/* Copyright (c) 2014 Gabi Melman. */
-/* */
-/* Permission is hereby granted, free of charge, to any person obtaining */
-/* a copy of this software and associated documentation files (the */
-/* "Software"), to deal in the Software without restriction, including */
-/* without limitation the rights to use, copy, modify, merge, publish, */
-/* distribute, sublicense, and/or sell copies of the Software, and to */
-/* permit persons to whom the Software is furnished to do so, subject to */
-/* the following conditions: */
-/* */
-/* The above copyright notice and this permission notice shall be */
-/* included in all copies or substantial portions of the Software. */
-/* */
-/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
-/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
-/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
-/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
-/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
-/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
-/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
-/*************************************************************************/
-
-#pragma once
-
-#include <string>
-#include <initializer_list>
-#include <chrono>
-#include <memory>
-
-//visual studio does not support noexcept yet
-#ifndef _MSC_VER
-#define SPDLOG_NOEXCEPT noexcept
-#else
-#define SPDLOG_NOEXCEPT throw()
-#endif
-
-
-namespace spdlog
-{
-
-class formatter;
-
-namespace sinks
-{
-class sink;
-}
-
-// Common types across the lib
-using log_clock = std::chrono::system_clock;
-using sink_ptr = std::shared_ptr < sinks::sink > ;
-using sinks_init_list = std::initializer_list < sink_ptr > ;
-using formatter_ptr = std::shared_ptr<spdlog::formatter>;
-
-
-//Log level enum
-namespace level
-{
-typedef enum
-{
- trace = 0,
- debug = 1,
- info = 2,
- notice = 3,
- warn = 4,
- err = 5,
- critical = 6,
- alert = 7,
- emerg = 8,
- off = 9
-} level_enum;
-
-static const char* level_names[] { "trace", "debug", "info", "notice", "warning", "error", "critical", "alert", "emerg", "off"};
-
-static const char* short_level_names[] { "T", "D", "I", "N", "W", "E", "C", "A", "M", "O"};
-
-inline const char* to_str(spdlog::level::level_enum l)
-{
- return level_names[l];
-}
-
-inline const char* to_short_str(spdlog::level::level_enum l)
-{
- return short_level_names[l];
-}
-} //level
-
-
-//
-// Async overflow policy - block by default.
-//
-enum class async_overflow_policy
-{
- block_retry, // Block / yield / sleep until message can be enqueued
- discard_log_msg // Discard the message it enqueue fails
-};
-
-
-//
-// Log exception
-//
-class spdlog_ex : public std::exception
-{
-public:
- spdlog_ex(const std::string& msg) :_msg(msg) {}
- const char* what() const SPDLOG_NOEXCEPT override
- {
- return _msg.c_str();
- }
-private:
- std::string _msg;
-
-};
-
-} //spdlog